mirror of
https://github.com/fafhrd91/actix-net
synced 2025-08-31 18:36:59 +02:00
Compare commits
22 Commits
server-v2.
...
rt-v2.5.0
Author | SHA1 | Date | |
---|---|---|---|
|
a2437eed29 | ||
|
67b357a175 | ||
|
3597af5c45 | ||
|
8891c2681e | ||
|
233c61ba08 | ||
|
161f239f12 | ||
|
7e7df2f931 | ||
|
ce8ec15eaa | ||
|
ae28ce5377 | ||
|
54d1d9e520 | ||
|
0b0cbd5388 | ||
|
443a328fb4 | ||
|
58a67ade32 | ||
|
38caa8f088 | ||
|
ed987eef06 | ||
|
3658929010 | ||
|
3f49d8ab54 | ||
|
161d1ee94b | ||
|
81ba7cafaa | ||
|
f8f51a2240 | ||
|
a2e765ea6e | ||
|
03dae6a4a4 |
@@ -14,7 +14,7 @@ ci-check = "hack --workspace --feature-powerset --exclude-features=io-uring chec
|
|||||||
ci-check-linux = "hack --workspace --feature-powerset check --tests --examples"
|
ci-check-linux = "hack --workspace --feature-powerset check --tests --examples"
|
||||||
|
|
||||||
# tests avoiding io-uring feature
|
# tests avoiding io-uring feature
|
||||||
ci-test = "hack test --workspace --exclude=actix-rt --exclude=actix-server --all-features --lib --tests --no-fail-fast -- --nocapture"
|
ci-test = " hack --feature-powerset --exclude=actix-rt --exclude=actix-server --exclude-features=io-uring test --workspace --lib --tests --no-fail-fast -- --nocapture"
|
||||||
ci-test-rt = " hack --feature-powerset --exclude-features=io-uring test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture"
|
ci-test-rt = " hack --feature-powerset --exclude-features=io-uring test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture"
|
||||||
ci-test-server = "hack --feature-powerset --exclude-features=io-uring test --package=actix-server --lib --tests --no-fail-fast -- --nocapture"
|
ci-test-server = "hack --feature-powerset --exclude-features=io-uring test --package=actix-server --lib --tests --no-fail-fast -- --nocapture"
|
||||||
|
|
||||||
|
13
.github/workflows/ci.yml
vendored
13
.github/workflows/ci.yml
vendored
@@ -196,13 +196,6 @@ jobs:
|
|||||||
- name: Cache Dependencies
|
- name: Cache Dependencies
|
||||||
uses: Swatinem/rust-cache@v1.3.0
|
uses: Swatinem/rust-cache@v1.3.0
|
||||||
|
|
||||||
- name: Install cargo-hack
|
- name: doc tests io-uring
|
||||||
uses: actions-rs/cargo@v1
|
run: |
|
||||||
with:
|
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=nightly cargo ci-doctest"
|
||||||
command: install
|
|
||||||
args: cargo-hack
|
|
||||||
|
|
||||||
- name: doc tests
|
|
||||||
uses: actions-rs/cargo@v1
|
|
||||||
timeout-minutes: 40
|
|
||||||
with: { command: ci-doctest }
|
|
||||||
|
@@ -1,6 +1,9 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
|
|
||||||
|
|
||||||
|
## 0.4.1 - 2021-11-05
|
||||||
* Added `LinesCodec.` [#338]
|
* Added `LinesCodec.` [#338]
|
||||||
* `Framed::poll_ready` flushes when the buffer is full. [#409]
|
* `Framed::poll_ready` flushes when the buffer is full. [#409]
|
||||||
|
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-codec"
|
name = "actix-codec"
|
||||||
version = "0.4.0"
|
version = "0.4.1"
|
||||||
authors = [
|
authors = [
|
||||||
"Nikolay Kim <fafhrd91@gmail.com>",
|
"Nikolay Kim <fafhrd91@gmail.com>",
|
||||||
"Rob Ede <robjtede@icloud.com>",
|
"Rob Ede <robjtede@icloud.com>",
|
||||||
|
@@ -1,6 +1,15 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
|
|
||||||
|
|
||||||
|
## 2.5.0 - 2021-11-22
|
||||||
|
* Add `System::run_with_code` to allow retrieving the exit code on stop. [#411]
|
||||||
|
|
||||||
|
[#411]: https://github.com/actix/actix-net/pull/411
|
||||||
|
|
||||||
|
|
||||||
|
## 2.4.0 - 2021-11-05
|
||||||
* Add `Arbiter::try_current` for situations where thread may or may not have Arbiter context. [#408]
|
* Add `Arbiter::try_current` for situations where thread may or may not have Arbiter context. [#408]
|
||||||
* Start io-uring with `System::new` when feature is enabled. [#395]
|
* Start io-uring with `System::new` when feature is enabled. [#395]
|
||||||
|
|
||||||
@@ -105,7 +114,7 @@
|
|||||||
[#129]: https://github.com/actix/actix-net/issues/129
|
[#129]: https://github.com/actix/actix-net/issues/129
|
||||||
|
|
||||||
|
|
||||||
## 1.1.0 - 2020-04-08 (YANKED)
|
## 1.1.0 - 2020-04-08 _(YANKED)_
|
||||||
* Expose `System::is_set` to check if current system has ben started [#99]
|
* Expose `System::is_set` to check if current system has ben started [#99]
|
||||||
* Add `Arbiter::is_running` to check if event loop is running [#124]
|
* Add `Arbiter::is_running` to check if event loop is running [#124]
|
||||||
* Add `Arbiter::local_join` associated function
|
* Add `Arbiter::local_join` associated function
|
||||||
|
@@ -1,9 +1,10 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-rt"
|
name = "actix-rt"
|
||||||
version = "2.3.0"
|
version = "2.5.0"
|
||||||
authors = [
|
authors = [
|
||||||
"Nikolay Kim <fafhrd91@gmail.com>",
|
"Nikolay Kim <fafhrd91@gmail.com>",
|
||||||
"Rob Ede <robjtede@icloud.com>",
|
"Rob Ede <robjtede@icloud.com>",
|
||||||
|
"fakeshadow <24548779@qq.com>",
|
||||||
]
|
]
|
||||||
description = "Tokio-based single-threaded async runtime for the Actix ecosystem"
|
description = "Tokio-based single-threaded async runtime for the Actix ecosystem"
|
||||||
keywords = ["async", "futures", "io", "runtime"]
|
keywords = ["async", "futures", "io", "runtime"]
|
||||||
|
@@ -3,11 +3,11 @@
|
|||||||
> Tokio-based single-threaded async runtime for the Actix ecosystem.
|
> Tokio-based single-threaded async runtime for the Actix ecosystem.
|
||||||
|
|
||||||
[](https://crates.io/crates/actix-rt)
|
[](https://crates.io/crates/actix-rt)
|
||||||
[](https://docs.rs/actix-rt/2.3.0)
|
[](https://docs.rs/actix-rt/2.5.0)
|
||||||
[](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html)
|
[](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html)
|
||||||

|

|
||||||
<br />
|
<br />
|
||||||
[](https://deps.rs/crate/actix-rt/2.3.0)
|
[](https://deps.rs/crate/actix-rt/2.5.0)
|
||||||

|

|
||||||
[](https://discord.gg/WghFtEH6Hb)
|
[](https://discord.gg/WghFtEH6Hb)
|
||||||
|
|
||||||
|
@@ -36,6 +36,9 @@
|
|||||||
//! # `io-uring` Support
|
//! # `io-uring` Support
|
||||||
//! There is experimental support for using io-uring with this crate by enabling the
|
//! There is experimental support for using io-uring with this crate by enabling the
|
||||||
//! `io-uring` feature. For now, it is semver exempt.
|
//! `io-uring` feature. For now, it is semver exempt.
|
||||||
|
//!
|
||||||
|
//! Note that there are currently some unimplemented parts of using `actix-rt` with `io-uring`.
|
||||||
|
//! In particular, when running a `System`, only `System::block_on` is supported.
|
||||||
|
|
||||||
#![deny(rust_2018_idioms, nonstandard_style)]
|
#![deny(rust_2018_idioms, nonstandard_style)]
|
||||||
#![allow(clippy::type_complexity)]
|
#![allow(clippy::type_complexity)]
|
||||||
|
@@ -67,11 +67,7 @@ impl System {
|
|||||||
let sys_ctrl = SystemController::new(sys_rx, stop_tx);
|
let sys_ctrl = SystemController::new(sys_rx, stop_tx);
|
||||||
rt.spawn(sys_ctrl);
|
rt.spawn(sys_ctrl);
|
||||||
|
|
||||||
SystemRunner {
|
SystemRunner { rt, stop_rx }
|
||||||
rt,
|
|
||||||
stop_rx,
|
|
||||||
system,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -94,7 +90,7 @@ impl System {
|
|||||||
where
|
where
|
||||||
F: Fn() -> tokio::runtime::Runtime,
|
F: Fn() -> tokio::runtime::Runtime,
|
||||||
{
|
{
|
||||||
unimplemented!("System::with_tokio_rt is not implemented yet")
|
unimplemented!("System::with_tokio_rt is not implemented for io-uring feature yet")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -175,38 +171,37 @@ impl System {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(feature = "io-uring"))]
|
|
||||||
/// Runner that keeps a [System]'s event loop alive until stop message is received.
|
/// Runner that keeps a [System]'s event loop alive until stop message is received.
|
||||||
|
#[cfg(not(feature = "io-uring"))]
|
||||||
#[must_use = "A SystemRunner does nothing unless `run` is called."]
|
#[must_use = "A SystemRunner does nothing unless `run` is called."]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct SystemRunner {
|
pub struct SystemRunner {
|
||||||
rt: crate::runtime::Runtime,
|
rt: crate::runtime::Runtime,
|
||||||
stop_rx: oneshot::Receiver<i32>,
|
stop_rx: oneshot::Receiver<i32>,
|
||||||
#[allow(dead_code)]
|
|
||||||
system: System,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(feature = "io-uring"))]
|
#[cfg(not(feature = "io-uring"))]
|
||||||
impl SystemRunner {
|
impl SystemRunner {
|
||||||
/// Starts event loop and will return once [System] is [stopped](System::stop).
|
/// Starts event loop and will return once [System] is [stopped](System::stop).
|
||||||
pub fn run(self) -> io::Result<()> {
|
pub fn run(self) -> io::Result<()> {
|
||||||
|
let exit_code = self.run_with_code()?;
|
||||||
|
|
||||||
|
match exit_code {
|
||||||
|
0 => Ok(()),
|
||||||
|
nonzero => Err(io::Error::new(
|
||||||
|
io::ErrorKind::Other,
|
||||||
|
format!("Non-zero exit code: {}", nonzero),
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Runs the event loop until [stopped](System::stop_with_code), returning the exit code.
|
||||||
|
pub fn run_with_code(self) -> io::Result<i32> {
|
||||||
let SystemRunner { rt, stop_rx, .. } = self;
|
let SystemRunner { rt, stop_rx, .. } = self;
|
||||||
|
|
||||||
// run loop
|
// run loop
|
||||||
match rt.block_on(stop_rx) {
|
rt.block_on(stop_rx)
|
||||||
Ok(code) => {
|
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
|
||||||
if code != 0 {
|
|
||||||
Err(io::Error::new(
|
|
||||||
io::ErrorKind::Other,
|
|
||||||
format!("Non-zero exit code: {}", code),
|
|
||||||
))
|
|
||||||
} else {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Runs the provided future, blocking the current thread until the future completes.
|
/// Runs the provided future, blocking the current thread until the future completes.
|
||||||
@@ -216,8 +211,8 @@ impl SystemRunner {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "io-uring")]
|
|
||||||
/// Runner that keeps a [System]'s event loop alive until stop message is received.
|
/// Runner that keeps a [System]'s event loop alive until stop message is received.
|
||||||
|
#[cfg(feature = "io-uring")]
|
||||||
#[must_use = "A SystemRunner does nothing unless `run` is called."]
|
#[must_use = "A SystemRunner does nothing unless `run` is called."]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct SystemRunner;
|
pub struct SystemRunner;
|
||||||
@@ -226,7 +221,14 @@ pub struct SystemRunner;
|
|||||||
impl SystemRunner {
|
impl SystemRunner {
|
||||||
/// Starts event loop and will return once [System] is [stopped](System::stop).
|
/// Starts event loop and will return once [System] is [stopped](System::stop).
|
||||||
pub fn run(self) -> io::Result<()> {
|
pub fn run(self) -> io::Result<()> {
|
||||||
unimplemented!("SystemRunner::run is not implemented yet")
|
unimplemented!("SystemRunner::run is not implemented for io-uring feature yet");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Runs the event loop until [stopped](System::stop_with_code), returning the exit code.
|
||||||
|
pub fn run_with_code(self) -> io::Result<i32> {
|
||||||
|
unimplemented!(
|
||||||
|
"SystemRunner::run_with_code is not implemented for io-uring feature yet"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Runs the provided future, blocking the current thread until the future completes.
|
/// Runs the provided future, blocking the current thread until the future completes.
|
||||||
|
@@ -24,6 +24,15 @@ fn await_for_timer() {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "io-uring"))]
|
||||||
|
#[test]
|
||||||
|
fn run_with_code() {
|
||||||
|
let sys = System::new();
|
||||||
|
System::current().stop_with_code(42);
|
||||||
|
let exit_code = sys.run_with_code().expect("system stop should not error");
|
||||||
|
assert_eq!(exit_code, 42);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn join_another_arbiter() {
|
fn join_another_arbiter() {
|
||||||
let time = Duration::from_secs(1);
|
let time = Duration::from_secs(1);
|
||||||
@@ -99,8 +108,8 @@ fn wait_for_spawns() {
|
|||||||
|
|
||||||
let handle = rt.spawn(async {
|
let handle = rt.spawn(async {
|
||||||
println!("running on the runtime");
|
println!("running on the runtime");
|
||||||
// assertion panic is caught at task boundary
|
// panic is caught at task boundary
|
||||||
assert_eq!(1, 2);
|
panic!("intentional test panic");
|
||||||
});
|
});
|
||||||
|
|
||||||
assert!(rt.block_on(handle).is_err());
|
assert!(rt.block_on(handle).is_err());
|
||||||
|
@@ -1,6 +1,21 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
|
|
||||||
|
|
||||||
|
## 2.0.0-beta.9 - 2021-11-15
|
||||||
|
* Restore `Arbiter` support lost in `beta.8`. [#417]
|
||||||
|
|
||||||
|
[#417]: https://github.com/actix/actix-net/pull/417
|
||||||
|
|
||||||
|
|
||||||
|
## 2.0.0-beta.8 - 2021-11-05 _(YANKED)_
|
||||||
|
* Fix non-unix signal handler. [#410]
|
||||||
|
|
||||||
|
[#410]: https://github.com/actix/actix-net/pull/410
|
||||||
|
|
||||||
|
|
||||||
|
## 2.0.0-beta.7 - 2021-11-05 _(YANKED)_
|
||||||
* Server can be started in regular Tokio runtime. [#408]
|
* Server can be started in regular Tokio runtime. [#408]
|
||||||
* Expose new `Server` type whose `Future` impl resolves when server stops. [#408]
|
* Expose new `Server` type whose `Future` impl resolves when server stops. [#408]
|
||||||
* Rename `Server` to `ServerHandle`. [#407]
|
* Rename `Server` to `ServerHandle`. [#407]
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-server"
|
name = "actix-server"
|
||||||
version = "2.0.0-beta.6"
|
version = "2.0.0-beta.9"
|
||||||
authors = [
|
authors = [
|
||||||
"Nikolay Kim <fafhrd91@gmail.com>",
|
"Nikolay Kim <fafhrd91@gmail.com>",
|
||||||
"fakeshadow <24548779@qq.com>",
|
"fakeshadow <24548779@qq.com>",
|
||||||
@@ -18,22 +18,27 @@ path = "src/lib.rs"
|
|||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = []
|
default = []
|
||||||
io-uring = ["actix-rt/io-uring"]
|
io-uring = ["tokio-uring", "actix-rt/io-uring"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-rt = { version = "2.0.0", default-features = false }
|
actix-rt = { version = "2.4.0", default-features = false }
|
||||||
actix-service = "2.0.0"
|
actix-service = "2.0.0"
|
||||||
actix-utils = "3.0.0"
|
actix-utils = "3.0.0"
|
||||||
|
|
||||||
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
|
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
|
||||||
|
futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] }
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
mio = { version = "0.7.6", features = ["os-poll", "net"] }
|
mio = { version = "0.8", features = ["os-poll", "net"] }
|
||||||
num_cpus = "1.13"
|
num_cpus = "1.13"
|
||||||
|
socket2 = "0.4.2"
|
||||||
tokio = { version = "1.5.1", features = ["sync"] }
|
tokio = { version = "1.5.1", features = ["sync"] }
|
||||||
|
|
||||||
|
# runtime for io-uring feature
|
||||||
|
tokio-uring = { version = "0.1", optional = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-codec = "0.4.0"
|
actix-codec = "0.4.0"
|
||||||
actix-rt = "2.0.0"
|
actix-rt = "2.4.0"
|
||||||
|
|
||||||
bytes = "1"
|
bytes = "1"
|
||||||
env_logger = "0.9"
|
env_logger = "0.9"
|
||||||
|
@@ -127,7 +127,7 @@ impl Accept {
|
|||||||
let mut events = mio::Events::with_capacity(256);
|
let mut events = mio::Events::with_capacity(256);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if let Err(e) = self.poll.poll(&mut events, None) {
|
if let Err(e) = self.poll.poll(&mut events, self.timeout) {
|
||||||
match e.kind() {
|
match e.kind() {
|
||||||
io::ErrorKind::Interrupted => {}
|
io::ErrorKind::Interrupted => {}
|
||||||
_ => panic!("Poll error: {}", e),
|
_ => panic!("Poll error: {}", e),
|
||||||
|
@@ -8,7 +8,7 @@ use crate::{
|
|||||||
server::ServerCommand,
|
server::ServerCommand,
|
||||||
service::{InternalServiceFactory, ServiceFactory, StreamNewService},
|
service::{InternalServiceFactory, ServiceFactory, StreamNewService},
|
||||||
socket::{
|
socket::{
|
||||||
MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs,
|
create_mio_tcp_listener, MioListener, MioTcpListener, StdTcpListener, ToSocketAddrs,
|
||||||
},
|
},
|
||||||
worker::ServerWorkerConfig,
|
worker::ServerWorkerConfig,
|
||||||
Server,
|
Server,
|
||||||
@@ -112,7 +112,7 @@ impl ServerBuilder {
|
|||||||
self.max_concurrent_connections(num)
|
self.max_concurrent_connections(num)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Stop Actix system.
|
/// Stop Actix `System` after server shutdown.
|
||||||
pub fn system_exit(mut self) -> Self {
|
pub fn system_exit(mut self) -> Self {
|
||||||
self.exit = true;
|
self.exit = true;
|
||||||
self
|
self
|
||||||
@@ -242,7 +242,8 @@ impl ServerBuilder {
|
|||||||
use std::net::{IpAddr, Ipv4Addr};
|
use std::net::{IpAddr, Ipv4Addr};
|
||||||
lst.set_nonblocking(true)?;
|
lst.set_nonblocking(true)?;
|
||||||
let token = self.next_token();
|
let token = self.next_token();
|
||||||
let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
|
let addr =
|
||||||
|
crate::socket::StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
|
||||||
self.factories.push(StreamNewService::create(
|
self.factories.push(StreamNewService::create(
|
||||||
name.as_ref().to_string(),
|
name.as_ref().to_string(),
|
||||||
token,
|
token,
|
||||||
@@ -263,7 +264,7 @@ pub(super) fn bind_addr<S: ToSocketAddrs>(
|
|||||||
let mut success = false;
|
let mut success = false;
|
||||||
let mut sockets = Vec::new();
|
let mut sockets = Vec::new();
|
||||||
for addr in addr.to_socket_addrs()? {
|
for addr in addr.to_socket_addrs()? {
|
||||||
match create_tcp_listener(addr, backlog) {
|
match create_mio_tcp_listener(addr, backlog) {
|
||||||
Ok(lst) => {
|
Ok(lst) => {
|
||||||
success = true;
|
success = true;
|
||||||
sockets.push(lst);
|
sockets.push(lst);
|
||||||
@@ -283,14 +284,3 @@ pub(super) fn bind_addr<S: ToSocketAddrs>(
|
|||||||
))
|
))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create_tcp_listener(addr: StdSocketAddr, backlog: u32) -> io::Result<MioTcpListener> {
|
|
||||||
let socket = match addr {
|
|
||||||
StdSocketAddr::V4(_) => MioTcpSocket::new_v4()?,
|
|
||||||
StdSocketAddr::V6(_) => MioTcpSocket::new_v6()?,
|
|
||||||
};
|
|
||||||
|
|
||||||
socket.set_reuseaddr(true)?;
|
|
||||||
socket.bind(addr)?;
|
|
||||||
socket.listen(backlog)
|
|
||||||
}
|
|
||||||
|
@@ -42,10 +42,12 @@ impl ServerHandle {
|
|||||||
/// Stop incoming connection processing, stop all workers and exit.
|
/// Stop incoming connection processing, stop all workers and exit.
|
||||||
pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
|
pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
let _ = self.cmd_tx.send(ServerCommand::Stop {
|
let _ = self.cmd_tx.send(ServerCommand::Stop {
|
||||||
graceful,
|
graceful,
|
||||||
completion: Some(tx),
|
completion: Some(tx),
|
||||||
});
|
});
|
||||||
|
|
||||||
async {
|
async {
|
||||||
let _ = rx.await;
|
let _ = rx.await;
|
||||||
}
|
}
|
||||||
|
@@ -4,7 +4,7 @@ use std::{
|
|||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures_core::future::{BoxFuture, LocalBoxFuture};
|
use futures_core::future::BoxFuture;
|
||||||
|
|
||||||
// a poor man's join future. joined future is only used when starting/stopping the server.
|
// a poor man's join future. joined future is only used when starting/stopping the server.
|
||||||
// pin_project and pinned futures are overkill for this task.
|
// pin_project and pinned futures are overkill for this task.
|
||||||
@@ -61,63 +61,6 @@ impl<T> Future for JoinAll<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn join_all_local<T>(
|
|
||||||
fut: Vec<impl Future<Output = T> + 'static>,
|
|
||||||
) -> JoinAllLocal<T> {
|
|
||||||
let fut = fut
|
|
||||||
.into_iter()
|
|
||||||
.map(|f| JoinLocalFuture::LocalFuture(Box::pin(f)))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
JoinAllLocal { fut }
|
|
||||||
}
|
|
||||||
|
|
||||||
// a poor man's join future. joined future is only used when starting/stopping the server.
|
|
||||||
// pin_project and pinned futures are overkill for this task.
|
|
||||||
pub(crate) struct JoinAllLocal<T> {
|
|
||||||
fut: Vec<JoinLocalFuture<T>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
enum JoinLocalFuture<T> {
|
|
||||||
LocalFuture(LocalBoxFuture<'static, T>),
|
|
||||||
Result(Option<T>),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Unpin for JoinAllLocal<T> {}
|
|
||||||
|
|
||||||
impl<T> Future for JoinAllLocal<T> {
|
|
||||||
type Output = Vec<T>;
|
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
||||||
let mut ready = true;
|
|
||||||
|
|
||||||
let this = self.get_mut();
|
|
||||||
for fut in this.fut.iter_mut() {
|
|
||||||
if let JoinLocalFuture::LocalFuture(f) = fut {
|
|
||||||
match f.as_mut().poll(cx) {
|
|
||||||
Poll::Ready(t) => {
|
|
||||||
*fut = JoinLocalFuture::Result(Some(t));
|
|
||||||
}
|
|
||||||
Poll::Pending => ready = false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if ready {
|
|
||||||
let mut res = Vec::new();
|
|
||||||
for fut in this.fut.iter_mut() {
|
|
||||||
if let JoinLocalFuture::Result(f) = fut {
|
|
||||||
res.push(f.take().unwrap());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Poll::Ready(res)
|
|
||||||
} else {
|
|
||||||
Poll::Pending
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
@@ -132,13 +75,4 @@ mod test {
|
|||||||
assert_eq!(Err(3), res.next().unwrap());
|
assert_eq!(Err(3), res.next().unwrap());
|
||||||
assert_eq!(Ok(9), res.next().unwrap());
|
assert_eq!(Ok(9), res.next().unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
|
||||||
async fn test_join_all_local() {
|
|
||||||
let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))];
|
|
||||||
let mut res = join_all_local(futs).await.into_iter();
|
|
||||||
assert_eq!(Ok(1), res.next().unwrap());
|
|
||||||
assert_eq!(Err(3), res.next().unwrap());
|
|
||||||
assert_eq!(Ok(9), res.next().unwrap());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@@ -132,15 +132,13 @@ impl Server {
|
|||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
// Give log information on what runtime will be used.
|
// Give log information on what runtime will be used.
|
||||||
let is_tokio = tokio::runtime::Handle::try_current().is_ok();
|
|
||||||
let is_actix = actix_rt::System::try_current().is_some();
|
let is_actix = actix_rt::System::try_current().is_some();
|
||||||
|
let is_tokio = tokio::runtime::Handle::try_current().is_ok();
|
||||||
|
|
||||||
match (is_tokio, is_actix) {
|
match (is_actix, is_tokio) {
|
||||||
(true, false) => info!("Tokio runtime found. Starting in existing Tokio runtime"),
|
(true, _) => info!("Actix runtime found; starting in Actix runtime"),
|
||||||
(_, true) => info!("Actix runtime found. Starting in Actix runtime"),
|
(_, true) => info!("Tokio runtime found; starting in existing Tokio runtime"),
|
||||||
(_, _) => info!(
|
(_, false) => panic!("Actix or Tokio runtime not found; halting"),
|
||||||
"Actix/Tokio runtime not found. Starting in newt Tokio current-thread runtime"
|
|
||||||
),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (_, name, lst) in &builder.sockets {
|
for (_, name, lst) in &builder.sockets {
|
||||||
@@ -196,11 +194,11 @@ impl Future for Server {
|
|||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
match self.as_mut().get_mut() {
|
match self.as_mut().get_mut() {
|
||||||
Server::Error(err) => Poll::Ready(Err(err
|
Self::Error(err) => Poll::Ready(Err(err
|
||||||
.take()
|
.take()
|
||||||
.expect("Server future cannot be polled after error"))),
|
.expect("Server future cannot be polled after error"))),
|
||||||
|
|
||||||
Server::Server(inner) => {
|
Self::Server(inner) => {
|
||||||
// poll Signals
|
// poll Signals
|
||||||
if let Some(ref mut signals) = inner.signals {
|
if let Some(ref mut signals) = inner.signals {
|
||||||
if let Poll::Ready(signal) = Pin::new(signals).poll(cx) {
|
if let Poll::Ready(signal) = Pin::new(signals).poll(cx) {
|
||||||
|
@@ -35,7 +35,7 @@ impl fmt::Display for Signal {
|
|||||||
/// Process signal listener.
|
/// Process signal listener.
|
||||||
pub(crate) struct Signals {
|
pub(crate) struct Signals {
|
||||||
#[cfg(not(unix))]
|
#[cfg(not(unix))]
|
||||||
signals: futures_core::future::LocalBoxFuture<'static, std::io::Result<()>>,
|
signals: futures_core::future::BoxFuture<'static, std::io::Result<()>>,
|
||||||
|
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
signals: Vec<(Signal, actix_rt::signal::unix::Signal)>,
|
signals: Vec<(Signal, actix_rt::signal::unix::Signal)>,
|
||||||
|
@@ -2,7 +2,7 @@ pub(crate) use std::net::{
|
|||||||
SocketAddr as StdSocketAddr, TcpListener as StdTcpListener, ToSocketAddrs,
|
SocketAddr as StdSocketAddr, TcpListener as StdTcpListener, ToSocketAddrs,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub(crate) use mio::net::{TcpListener as MioTcpListener, TcpSocket as MioTcpSocket};
|
pub(crate) use mio::net::TcpListener as MioTcpListener;
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
pub(crate) use {
|
pub(crate) use {
|
||||||
mio::net::UnixListener as MioUnixListener,
|
mio::net::UnixListener as MioUnixListener,
|
||||||
@@ -223,6 +223,22 @@ mod unix_impl {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn create_mio_tcp_listener(
|
||||||
|
addr: StdSocketAddr,
|
||||||
|
backlog: u32,
|
||||||
|
) -> io::Result<MioTcpListener> {
|
||||||
|
use socket2::{Domain, Protocol, Socket, Type};
|
||||||
|
|
||||||
|
let socket = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))?;
|
||||||
|
|
||||||
|
socket.set_reuse_address(true)?;
|
||||||
|
socket.set_nonblocking(true)?;
|
||||||
|
socket.bind(&addr.into())?;
|
||||||
|
socket.listen(backlog as i32)?;
|
||||||
|
|
||||||
|
Ok(MioTcpListener::from_std(StdTcpListener::from(socket)))
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
@@ -234,11 +250,8 @@ mod tests {
|
|||||||
assert_eq!(format!("{}", addr), "127.0.0.1:8080");
|
assert_eq!(format!("{}", addr), "127.0.0.1:8080");
|
||||||
|
|
||||||
let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap();
|
let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap();
|
||||||
let socket = MioTcpSocket::new_v4().unwrap();
|
let lst = create_mio_tcp_listener(addr, 128).unwrap();
|
||||||
socket.set_reuseaddr(true).unwrap();
|
let lst = MioListener::Tcp(lst);
|
||||||
socket.bind(addr).unwrap();
|
|
||||||
let tcp = socket.listen(128).unwrap();
|
|
||||||
let lst = MioListener::Tcp(tcp);
|
|
||||||
assert!(format!("{:?}", lst).contains("TcpListener"));
|
assert!(format!("{:?}", lst).contains("TcpListener"));
|
||||||
assert!(format!("{}", lst).contains("127.0.0.1"));
|
assert!(format!("{}", lst).contains("127.0.0.1"));
|
||||||
}
|
}
|
||||||
|
@@ -1,5 +1,4 @@
|
|||||||
use std::sync::mpsc;
|
use std::{io, net, sync::mpsc, thread};
|
||||||
use std::{io, net, thread};
|
|
||||||
|
|
||||||
use actix_rt::{net::TcpStream, System};
|
use actix_rt::{net::TcpStream, System};
|
||||||
|
|
||||||
@@ -105,12 +104,16 @@ impl TestServer {
|
|||||||
|
|
||||||
/// Get first available unused local address.
|
/// Get first available unused local address.
|
||||||
pub fn unused_addr() -> net::SocketAddr {
|
pub fn unused_addr() -> net::SocketAddr {
|
||||||
|
use socket2::{Domain, Protocol, Socket, Type};
|
||||||
|
|
||||||
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
|
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
|
||||||
let socket = mio::net::TcpSocket::new_v4().unwrap();
|
let socket =
|
||||||
socket.bind(addr).unwrap();
|
Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP)).unwrap();
|
||||||
socket.set_reuseaddr(true).unwrap();
|
socket.set_reuse_address(true).unwrap();
|
||||||
let tcp = socket.listen(1024).unwrap();
|
socket.set_nonblocking(true).unwrap();
|
||||||
tcp.local_addr().unwrap()
|
socket.bind(&addr.into()).unwrap();
|
||||||
|
socket.listen(1024).unwrap();
|
||||||
|
net::TcpListener::from(socket).local_addr().unwrap()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -147,3 +150,16 @@ impl Drop for TestServerRuntime {
|
|||||||
self.stop()
|
self.stop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use actix_service::fn_service;
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn plain_tokio_runtime() {
|
||||||
|
let srv = TestServer::with(|| fn_service(|_sock| async move { Ok::<_, ()>(()) }));
|
||||||
|
assert!(srv.connect().is_ok());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@@ -24,7 +24,6 @@ use tokio::sync::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
join_all::join_all_local,
|
|
||||||
service::{BoxedServerService, InternalServiceFactory},
|
service::{BoxedServerService, InternalServiceFactory},
|
||||||
socket::MioStream,
|
socket::MioStream,
|
||||||
waker_queue::{WakerInterest, WakerQueue},
|
waker_queue::{WakerInterest, WakerQueue},
|
||||||
@@ -202,8 +201,8 @@ impl WorkerHandleServer {
|
|||||||
pub(crate) struct ServerWorker {
|
pub(crate) struct ServerWorker {
|
||||||
// UnboundedReceiver<Conn> should always be the first field.
|
// UnboundedReceiver<Conn> should always be the first field.
|
||||||
// It must be dropped as soon as ServerWorker dropping.
|
// It must be dropped as soon as ServerWorker dropping.
|
||||||
rx: UnboundedReceiver<Conn>,
|
conn_rx: UnboundedReceiver<Conn>,
|
||||||
rx2: UnboundedReceiver<Stop>,
|
stop_rx: UnboundedReceiver<Stop>,
|
||||||
counter: WorkerCounter,
|
counter: WorkerCounter,
|
||||||
services: Box<[WorkerService]>,
|
services: Box<[WorkerService]>,
|
||||||
factories: Box<[Box<dyn InternalServiceFactory>]>,
|
factories: Box<[Box<dyn InternalServiceFactory>]>,
|
||||||
@@ -212,7 +211,7 @@ pub(crate) struct ServerWorker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct WorkerService {
|
struct WorkerService {
|
||||||
factory: usize,
|
factory_idx: usize,
|
||||||
status: WorkerServiceStatus,
|
status: WorkerServiceStatus,
|
||||||
service: BoxedServerService,
|
service: BoxedServerService,
|
||||||
}
|
}
|
||||||
@@ -234,6 +233,12 @@ enum WorkerServiceStatus {
|
|||||||
Stopped,
|
Stopped,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Default for WorkerServiceStatus {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::Unavailable
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Config for worker behavior passed down from server builder.
|
/// Config for worker behavior passed down from server builder.
|
||||||
#[derive(Debug, Clone, Copy)]
|
#[derive(Debug, Clone, Copy)]
|
||||||
pub(crate) struct ServerWorkerConfig {
|
pub(crate) struct ServerWorkerConfig {
|
||||||
@@ -277,14 +282,131 @@ impl ServerWorker {
|
|||||||
) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> {
|
) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> {
|
||||||
trace!("starting server worker {}", idx);
|
trace!("starting server worker {}", idx);
|
||||||
|
|
||||||
let (tx1, rx) = unbounded_channel();
|
let (tx1, conn_rx) = unbounded_channel();
|
||||||
let (tx2, rx2) = unbounded_channel();
|
let (tx2, stop_rx) = unbounded_channel();
|
||||||
|
|
||||||
let counter = Counter::new(config.max_concurrent_connections);
|
let counter = Counter::new(config.max_concurrent_connections);
|
||||||
|
let pair = handle_pair(idx, tx1, tx2, counter.clone());
|
||||||
|
|
||||||
let counter_clone = counter.clone();
|
// get actix system context if it is set
|
||||||
// every worker runs in it's own arbiter.
|
let actix_system = System::try_current();
|
||||||
|
|
||||||
|
// get tokio runtime handle if it is set
|
||||||
|
let tokio_handle = tokio::runtime::Handle::try_current().ok();
|
||||||
|
|
||||||
|
// service factories initialization channel
|
||||||
|
let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel::<io::Result<()>>(1);
|
||||||
|
|
||||||
|
// outline of following code:
|
||||||
|
//
|
||||||
|
// if system exists
|
||||||
|
// if uring enabled
|
||||||
|
// start arbiter using uring method
|
||||||
|
// else
|
||||||
|
// start arbiter with regular tokio
|
||||||
|
// else
|
||||||
|
// if uring enabled
|
||||||
|
// start uring in spawned thread
|
||||||
|
// else
|
||||||
|
// start regular tokio in spawned thread
|
||||||
|
|
||||||
|
// every worker runs in it's own thread and tokio runtime.
|
||||||
// use a custom tokio runtime builder to change the settings of runtime.
|
// use a custom tokio runtime builder to change the settings of runtime.
|
||||||
|
|
||||||
|
match (actix_system, tokio_handle) {
|
||||||
|
(None, None) => {
|
||||||
|
panic!("No runtime detected. Start a Tokio (or Actix) runtime.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// no actix system
|
||||||
|
(None, Some(rt_handle)) => {
|
||||||
|
std::thread::Builder::new()
|
||||||
|
.name(format!("actix-server worker {}", idx))
|
||||||
|
.spawn(move || {
|
||||||
|
let (worker_stopped_tx, worker_stopped_rx) = oneshot::channel();
|
||||||
|
|
||||||
|
// local set for running service init futures and worker services
|
||||||
|
let ls = tokio::task::LocalSet::new();
|
||||||
|
|
||||||
|
// init services using existing Tokio runtime (so probably on main thread)
|
||||||
|
let services = rt_handle.block_on(ls.run_until(async {
|
||||||
|
let mut services = Vec::new();
|
||||||
|
|
||||||
|
for (idx, factory) in factories.iter().enumerate() {
|
||||||
|
match factory.create().await {
|
||||||
|
Ok((token, svc)) => services.push((idx, token, svc)),
|
||||||
|
|
||||||
|
Err(err) => {
|
||||||
|
error!("Can not start worker: {:?}", err);
|
||||||
|
return Err(io::Error::new(
|
||||||
|
io::ErrorKind::Other,
|
||||||
|
format!("can not start server service {}", idx),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(services)
|
||||||
|
}));
|
||||||
|
|
||||||
|
let services = match services {
|
||||||
|
Ok(services) => {
|
||||||
|
factory_tx.send(Ok(())).unwrap();
|
||||||
|
services
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
factory_tx.send(Err(err)).unwrap();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let worker_services = wrap_worker_services(services);
|
||||||
|
|
||||||
|
let worker_fut = async move {
|
||||||
|
// spawn to make sure ServerWorker runs as non boxed future.
|
||||||
|
spawn(async move {
|
||||||
|
ServerWorker {
|
||||||
|
conn_rx,
|
||||||
|
stop_rx,
|
||||||
|
services: worker_services.into_boxed_slice(),
|
||||||
|
counter: WorkerCounter::new(idx, waker_queue, counter),
|
||||||
|
factories: factories.into_boxed_slice(),
|
||||||
|
state: WorkerState::default(),
|
||||||
|
shutdown_timeout: config.shutdown_timeout,
|
||||||
|
}
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// wake up outermost task waiting for shutdown
|
||||||
|
worker_stopped_tx.send(()).unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
worker_stopped_rx.await.unwrap();
|
||||||
|
};
|
||||||
|
|
||||||
|
#[cfg(all(target_os = "linux", feature = "io-uring"))]
|
||||||
|
{
|
||||||
|
// TODO: pass max blocking thread config when tokio-uring enable configuration
|
||||||
|
// on building runtime.
|
||||||
|
let _ = config.max_blocking_threads;
|
||||||
|
tokio_uring::start(worker_fut);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
|
||||||
|
{
|
||||||
|
let rt = tokio::runtime::Builder::new_current_thread()
|
||||||
|
.enable_all()
|
||||||
|
.max_blocking_threads(config.max_blocking_threads)
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
rt.block_on(ls.run_until(worker_fut));
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.expect("cannot spawn server worker thread");
|
||||||
|
}
|
||||||
|
|
||||||
|
// with actix system
|
||||||
|
(Some(_sys), _) => {
|
||||||
#[cfg(all(target_os = "linux", feature = "io-uring"))]
|
#[cfg(all(target_os = "linux", feature = "io-uring"))]
|
||||||
let arbiter = {
|
let arbiter = {
|
||||||
// TODO: pass max blocking thread config when tokio-uring enable configuration
|
// TODO: pass max blocking thread config when tokio-uring enable configuration
|
||||||
@@ -293,89 +415,63 @@ impl ServerWorker {
|
|||||||
Arbiter::new()
|
Arbiter::new()
|
||||||
};
|
};
|
||||||
|
|
||||||
// get actix system context if it is set
|
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
|
||||||
let sys = System::try_current();
|
let arbiter = {
|
||||||
|
Arbiter::with_tokio_rt(move || {
|
||||||
// service factories initialization channel
|
tokio::runtime::Builder::new_current_thread()
|
||||||
let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel(1);
|
|
||||||
|
|
||||||
std::thread::Builder::new()
|
|
||||||
.name(format!("actix-server worker {}", idx))
|
|
||||||
.spawn(move || {
|
|
||||||
// forward existing actix system context
|
|
||||||
if let Some(sys) = sys {
|
|
||||||
System::set_current(sys);
|
|
||||||
}
|
|
||||||
|
|
||||||
let rt = tokio::runtime::Builder::new_current_thread()
|
|
||||||
.enable_all()
|
.enable_all()
|
||||||
.max_blocking_threads(config.max_blocking_threads)
|
.max_blocking_threads(config.max_blocking_threads)
|
||||||
.build()
|
.build()
|
||||||
.unwrap();
|
.unwrap()
|
||||||
|
|
||||||
rt.block_on(tokio::task::LocalSet::new().run_until(async move {
|
|
||||||
let fut = factories
|
|
||||||
.iter()
|
|
||||||
.enumerate()
|
|
||||||
.map(|(idx, factory)| {
|
|
||||||
let fut = factory.create();
|
|
||||||
async move { fut.await.map(|(t, s)| (idx, t, s)) }
|
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
// a second spawn to run !Send future tasks.
|
|
||||||
spawn(async move {
|
|
||||||
let res = join_all_local(fut)
|
|
||||||
.await
|
|
||||||
.into_iter()
|
|
||||||
.collect::<Result<Vec<_>, _>>();
|
|
||||||
|
|
||||||
let services = match res {
|
|
||||||
Ok(res) => res
|
|
||||||
.into_iter()
|
|
||||||
.fold(Vec::new(), |mut services, (factory, token, service)| {
|
|
||||||
assert_eq!(token, services.len());
|
|
||||||
services.push(WorkerService {
|
|
||||||
factory,
|
|
||||||
service,
|
|
||||||
status: WorkerServiceStatus::Unavailable,
|
|
||||||
});
|
|
||||||
services
|
|
||||||
})
|
|
||||||
.into_boxed_slice(),
|
|
||||||
|
|
||||||
Err(e) => {
|
|
||||||
error!("Can not start worker: {:?}", e);
|
|
||||||
Arbiter::try_current().as_ref().map(ArbiterHandle::stop);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
factory_tx.send(()).unwrap();
|
arbiter.spawn(async move {
|
||||||
|
// spawn_local to run !Send future tasks.
|
||||||
|
spawn(async move {
|
||||||
|
let mut services = Vec::new();
|
||||||
|
|
||||||
// a third spawn to make sure ServerWorker runs as non boxed future.
|
for (idx, factory) in factories.iter().enumerate() {
|
||||||
|
match factory.create().await {
|
||||||
|
Ok((token, svc)) => services.push((idx, token, svc)),
|
||||||
|
|
||||||
|
Err(err) => {
|
||||||
|
error!("Can not start worker: {:?}", err);
|
||||||
|
Arbiter::current().stop();
|
||||||
|
factory_tx
|
||||||
|
.send(Err(io::Error::new(
|
||||||
|
io::ErrorKind::Other,
|
||||||
|
format!("can not start server service {}", idx),
|
||||||
|
)))
|
||||||
|
.unwrap();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
factory_tx.send(Ok(())).unwrap();
|
||||||
|
|
||||||
|
let worker_services = wrap_worker_services(services);
|
||||||
|
|
||||||
|
// spawn to make sure ServerWorker runs as non boxed future.
|
||||||
spawn(ServerWorker {
|
spawn(ServerWorker {
|
||||||
rx,
|
conn_rx,
|
||||||
rx2,
|
stop_rx,
|
||||||
services,
|
services: worker_services.into_boxed_slice(),
|
||||||
counter: WorkerCounter::new(idx, waker_queue, counter_clone),
|
counter: WorkerCounter::new(idx, waker_queue, counter),
|
||||||
factories: factories.into_boxed_slice(),
|
factories: factories.into_boxed_slice(),
|
||||||
state: Default::default(),
|
state: Default::default(),
|
||||||
shutdown_timeout: config.shutdown_timeout,
|
shutdown_timeout: config.shutdown_timeout,
|
||||||
})
|
});
|
||||||
.await
|
});
|
||||||
.expect("task 3 panic");
|
});
|
||||||
})
|
}
|
||||||
.await
|
};
|
||||||
.expect("task 2 panic");
|
|
||||||
}))
|
|
||||||
})
|
|
||||||
.expect("worker thread error/panic");
|
|
||||||
|
|
||||||
// wait for service factories initialization
|
// wait for service factories initialization
|
||||||
factory_rx.recv().unwrap();
|
factory_rx.recv().unwrap()?;
|
||||||
|
|
||||||
Ok(handle_pair(idx, tx1, tx2, counter))
|
Ok(pair)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn restart_service(&mut self, idx: usize, factory_id: usize) {
|
fn restart_service(&mut self, idx: usize, factory_id: usize) {
|
||||||
@@ -413,7 +509,7 @@ impl ServerWorker {
|
|||||||
if srv.status == WorkerServiceStatus::Unavailable {
|
if srv.status == WorkerServiceStatus::Unavailable {
|
||||||
trace!(
|
trace!(
|
||||||
"Service {:?} is available",
|
"Service {:?} is available",
|
||||||
self.factories[srv.factory].name(idx)
|
self.factories[srv.factory_idx].name(idx)
|
||||||
);
|
);
|
||||||
srv.status = WorkerServiceStatus::Available;
|
srv.status = WorkerServiceStatus::Available;
|
||||||
}
|
}
|
||||||
@@ -424,7 +520,7 @@ impl ServerWorker {
|
|||||||
if srv.status == WorkerServiceStatus::Available {
|
if srv.status == WorkerServiceStatus::Available {
|
||||||
trace!(
|
trace!(
|
||||||
"Service {:?} is unavailable",
|
"Service {:?} is unavailable",
|
||||||
self.factories[srv.factory].name(idx)
|
self.factories[srv.factory_idx].name(idx)
|
||||||
);
|
);
|
||||||
srv.status = WorkerServiceStatus::Unavailable;
|
srv.status = WorkerServiceStatus::Unavailable;
|
||||||
}
|
}
|
||||||
@@ -432,10 +528,10 @@ impl ServerWorker {
|
|||||||
Poll::Ready(Err(_)) => {
|
Poll::Ready(Err(_)) => {
|
||||||
error!(
|
error!(
|
||||||
"Service {:?} readiness check returned error, restarting",
|
"Service {:?} readiness check returned error, restarting",
|
||||||
self.factories[srv.factory].name(idx)
|
self.factories[srv.factory_idx].name(idx)
|
||||||
);
|
);
|
||||||
srv.status = WorkerServiceStatus::Failed;
|
srv.status = WorkerServiceStatus::Failed;
|
||||||
return Err((idx, srv.factory));
|
return Err((idx, srv.factory_idx));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -478,7 +574,6 @@ impl Default for WorkerState {
|
|||||||
|
|
||||||
impl Drop for ServerWorker {
|
impl Drop for ServerWorker {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
trace!("stopping ServerWorker Arbiter");
|
|
||||||
Arbiter::try_current().as_ref().map(ArbiterHandle::stop);
|
Arbiter::try_current().as_ref().map(ArbiterHandle::stop);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -490,7 +585,8 @@ impl Future for ServerWorker {
|
|||||||
let this = self.as_mut().get_mut();
|
let this = self.as_mut().get_mut();
|
||||||
|
|
||||||
// `StopWorker` message handler
|
// `StopWorker` message handler
|
||||||
if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx)
|
if let Poll::Ready(Some(Stop { graceful, tx })) =
|
||||||
|
Pin::new(&mut this.stop_rx).poll_recv(cx)
|
||||||
{
|
{
|
||||||
let num = this.counter.total();
|
let num = this.counter.total();
|
||||||
if num == 0 {
|
if num == 0 {
|
||||||
@@ -552,6 +648,14 @@ impl Future for ServerWorker {
|
|||||||
self.poll(cx)
|
self.poll(cx)
|
||||||
}
|
}
|
||||||
WorkerState::Shutdown(ref mut shutdown) => {
|
WorkerState::Shutdown(ref mut shutdown) => {
|
||||||
|
// drop all pending connections in rx channel.
|
||||||
|
while let Poll::Ready(Some(conn)) = Pin::new(&mut this.conn_rx).poll_recv(cx) {
|
||||||
|
// WorkerCounterGuard is needed as Accept thread has incremented counter.
|
||||||
|
// It's guard's job to decrement the counter together with drop of Conn.
|
||||||
|
let guard = this.counter.guard();
|
||||||
|
drop((conn, guard));
|
||||||
|
}
|
||||||
|
|
||||||
// wait for 1 second
|
// wait for 1 second
|
||||||
ready!(shutdown.timer.as_mut().poll(cx));
|
ready!(shutdown.timer.as_mut().poll(cx));
|
||||||
|
|
||||||
@@ -592,7 +696,7 @@ impl Future for ServerWorker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// handle incoming io stream
|
// handle incoming io stream
|
||||||
match ready!(Pin::new(&mut this.rx).poll_recv(cx)) {
|
match ready!(Pin::new(&mut this.conn_rx).poll_recv(cx)) {
|
||||||
Some(msg) => {
|
Some(msg) => {
|
||||||
let guard = this.counter.guard();
|
let guard = this.counter.guard();
|
||||||
let _ = this.services[msg.token].service.call((guard, msg.io));
|
let _ = this.services[msg.token].service.call((guard, msg.io));
|
||||||
@@ -603,3 +707,19 @@ impl Future for ServerWorker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn wrap_worker_services(
|
||||||
|
services: Vec<(usize, usize, BoxedServerService)>,
|
||||||
|
) -> Vec<WorkerService> {
|
||||||
|
services
|
||||||
|
.into_iter()
|
||||||
|
.fold(Vec::new(), |mut services, (idx, token, service)| {
|
||||||
|
assert_eq!(token, services.len());
|
||||||
|
services.push(WorkerService {
|
||||||
|
factory_idx: idx,
|
||||||
|
service,
|
||||||
|
status: WorkerServiceStatus::Unavailable,
|
||||||
|
});
|
||||||
|
services
|
||||||
|
})
|
||||||
|
}
|
||||||
|
@@ -1,18 +1,19 @@
|
|||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::{
|
||||||
use std::sync::{mpsc, Arc};
|
net,
|
||||||
use std::{net, thread, time::Duration};
|
sync::{
|
||||||
|
atomic::{AtomicUsize, Ordering},
|
||||||
|
mpsc, Arc,
|
||||||
|
},
|
||||||
|
thread,
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
|
||||||
use actix_rt::{net::TcpStream, time::sleep};
|
use actix_rt::{net::TcpStream, time::sleep};
|
||||||
use actix_server::Server;
|
use actix_server::{Server, TestServer};
|
||||||
use actix_service::fn_service;
|
use actix_service::fn_service;
|
||||||
|
|
||||||
fn unused_addr() -> net::SocketAddr {
|
fn unused_addr() -> net::SocketAddr {
|
||||||
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
|
TestServer::unused_addr()
|
||||||
let socket = mio::net::TcpSocket::new_v4().unwrap();
|
|
||||||
socket.bind(addr).unwrap();
|
|
||||||
socket.set_reuseaddr(true).unwrap();
|
|
||||||
let tcp = socket.listen(32).unwrap();
|
|
||||||
tcp.local_addr().unwrap()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -30,28 +31,63 @@ fn test_bind() {
|
|||||||
})?
|
})?
|
||||||
.run();
|
.run();
|
||||||
|
|
||||||
let _ = tx.send((srv.handle(), actix_rt::System::current()));
|
let _ = tx.send(srv.handle());
|
||||||
|
|
||||||
srv.await
|
srv.await
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
let (srv, sys) = rx.recv().unwrap();
|
let srv = rx.recv().unwrap();
|
||||||
|
|
||||||
|
thread::sleep(Duration::from_millis(500));
|
||||||
|
assert!(net::TcpStream::connect(addr).is_ok());
|
||||||
|
|
||||||
|
let _ = srv.stop(true);
|
||||||
|
h.join().unwrap().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn plain_tokio_runtime() {
|
||||||
|
let addr = unused_addr();
|
||||||
|
let (tx, rx) = mpsc::channel();
|
||||||
|
|
||||||
|
let h = thread::spawn(move || {
|
||||||
|
let rt = tokio::runtime::Builder::new_current_thread()
|
||||||
|
.enable_all()
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
rt.block_on(async {
|
||||||
|
let srv = Server::build()
|
||||||
|
.workers(1)
|
||||||
|
.disable_signals()
|
||||||
|
.bind("test", addr, move || {
|
||||||
|
fn_service(|_| async { Ok::<_, ()>(()) })
|
||||||
|
})?
|
||||||
|
.run();
|
||||||
|
|
||||||
|
tx.send(srv.handle()).unwrap();
|
||||||
|
|
||||||
|
srv.await
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
let srv = rx.recv().unwrap();
|
||||||
|
|
||||||
thread::sleep(Duration::from_millis(500));
|
thread::sleep(Duration::from_millis(500));
|
||||||
assert!(net::TcpStream::connect(addr).is_ok());
|
assert!(net::TcpStream::connect(addr).is_ok());
|
||||||
|
|
||||||
let _ = srv.stop(true);
|
let _ = srv.stop(true);
|
||||||
sys.stop();
|
|
||||||
h.join().unwrap().unwrap();
|
h.join().unwrap().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_listen() {
|
fn test_listen() {
|
||||||
let addr = unused_addr();
|
let addr = unused_addr();
|
||||||
|
let lst = net::TcpListener::bind(addr).unwrap();
|
||||||
|
|
||||||
let (tx, rx) = mpsc::channel();
|
let (tx, rx) = mpsc::channel();
|
||||||
|
|
||||||
let h = thread::spawn(move || {
|
let h = thread::spawn(move || {
|
||||||
let lst = net::TcpListener::bind(addr)?;
|
|
||||||
actix_rt::System::new().block_on(async {
|
actix_rt::System::new().block_on(async {
|
||||||
let srv = Server::build()
|
let srv = Server::build()
|
||||||
.disable_signals()
|
.disable_signals()
|
||||||
@@ -61,19 +97,18 @@ fn test_listen() {
|
|||||||
})?
|
})?
|
||||||
.run();
|
.run();
|
||||||
|
|
||||||
let _ = tx.send((srv.handle(), actix_rt::System::current()));
|
let _ = tx.send(srv.handle());
|
||||||
|
|
||||||
srv.await
|
srv.await
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
let (srv, sys) = rx.recv().unwrap();
|
let srv = rx.recv().unwrap();
|
||||||
|
|
||||||
thread::sleep(Duration::from_millis(500));
|
thread::sleep(Duration::from_millis(500));
|
||||||
assert!(net::TcpStream::connect(addr).is_ok());
|
assert!(net::TcpStream::connect(addr).is_ok());
|
||||||
|
|
||||||
let _ = srv.stop(true);
|
let _ = srv.stop(true);
|
||||||
sys.stop();
|
|
||||||
h.join().unwrap().unwrap();
|
h.join().unwrap().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -280,12 +315,12 @@ async fn test_service_restart() {
|
|||||||
.workers(1)
|
.workers(1)
|
||||||
.run();
|
.run();
|
||||||
|
|
||||||
let _ = tx.send((srv.handle(), actix_rt::System::current()));
|
let _ = tx.send(srv.handle());
|
||||||
srv.await
|
srv.await
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
let (srv, sys) = rx.recv().unwrap();
|
let srv = rx.recv().unwrap();
|
||||||
|
|
||||||
for _ in 0..5 {
|
for _ in 0..5 {
|
||||||
TcpStream::connect(addr1)
|
TcpStream::connect(addr1)
|
||||||
@@ -308,7 +343,6 @@ async fn test_service_restart() {
|
|||||||
assert!(num2_clone.load(Ordering::SeqCst) > 5);
|
assert!(num2_clone.load(Ordering::SeqCst) > 5);
|
||||||
|
|
||||||
let _ = srv.stop(false);
|
let _ = srv.stop(false);
|
||||||
sys.stop();
|
|
||||||
h.join().unwrap().unwrap();
|
h.join().unwrap().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -385,13 +419,13 @@ async fn worker_restart() {
|
|||||||
.workers(2)
|
.workers(2)
|
||||||
.run();
|
.run();
|
||||||
|
|
||||||
let _ = tx.send((srv.handle(), actix_rt::System::current()));
|
let _ = tx.send(srv.handle());
|
||||||
|
|
||||||
srv.await
|
srv.await
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
let (srv, sys) = rx.recv().unwrap();
|
let srv = rx.recv().unwrap();
|
||||||
|
|
||||||
sleep(Duration::from_secs(3)).await;
|
sleep(Duration::from_secs(3)).await;
|
||||||
|
|
||||||
@@ -449,6 +483,31 @@ async fn worker_restart() {
|
|||||||
stream.shutdown().await.unwrap();
|
stream.shutdown().await.unwrap();
|
||||||
|
|
||||||
let _ = srv.stop(false);
|
let _ = srv.stop(false);
|
||||||
sys.stop();
|
|
||||||
h.join().unwrap().unwrap();
|
h.join().unwrap().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[should_panic]
|
||||||
|
fn no_runtime() {
|
||||||
|
// test set up in a way that would prevent time out if support for runtime-less init was added
|
||||||
|
|
||||||
|
let addr = unused_addr();
|
||||||
|
|
||||||
|
let srv = Server::build()
|
||||||
|
.workers(1)
|
||||||
|
.disable_signals()
|
||||||
|
.bind("test", addr, move || {
|
||||||
|
fn_service(|_| async { Ok::<_, ()>(()) })
|
||||||
|
})
|
||||||
|
.unwrap()
|
||||||
|
.run();
|
||||||
|
|
||||||
|
let rt = tokio::runtime::Builder::new_current_thread()
|
||||||
|
.enable_all()
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let _ = srv.handle().stop(true);
|
||||||
|
|
||||||
|
rt.block_on(async { srv.await }).unwrap();
|
||||||
|
}
|
||||||
|
@@ -56,221 +56,124 @@
|
|||||||
|
|
||||||
|
|
||||||
## 1.0.6 - 2020-08-09
|
## 1.0.6 - 2020-08-09
|
||||||
|
|
||||||
### Fixed
|
|
||||||
|
|
||||||
* Removed unsound custom Cell implementation that allowed obtaining several mutable references to
|
* Removed unsound custom Cell implementation that allowed obtaining several mutable references to
|
||||||
the same data, which is undefined behavior in Rust and could lead to violations of memory safety. External code could obtain several mutable references to the same data through
|
the same data, which is undefined behavior in Rust and could lead to violations of memory safety. External code could obtain several mutable references to the same data through
|
||||||
service combinators. Attempts to acquire several mutable references to the same data will instead
|
service combinators. Attempts to acquire several mutable references to the same data will instead
|
||||||
result in a panic.
|
result in a panic.
|
||||||
|
|
||||||
## [1.0.5] - 2020-01-16
|
|
||||||
|
|
||||||
### Fixed
|
## 1.0.5 - 2020-01-16
|
||||||
|
* Fixed unsoundness in .and_then()/.then() service combinators.
|
||||||
|
|
||||||
* Fixed unsoundness in .and_then()/.then() service combinators
|
|
||||||
|
|
||||||
## [1.0.4] - 2020-01-15
|
|
||||||
|
|
||||||
### Fixed
|
|
||||||
|
|
||||||
|
## 1.0.4 - 2020-01-15
|
||||||
* Revert 1.0.3 change
|
* Revert 1.0.3 change
|
||||||
|
|
||||||
## [1.0.3] - 2020-01-15
|
|
||||||
|
|
||||||
### Fixed
|
## 1.0.3 - 2020-01-15
|
||||||
|
* Fixed unsoundness in `AndThenService` impl.
|
||||||
* Fixed unsoundness in `AndThenService` impl
|
|
||||||
|
|
||||||
## [1.0.2] - 2020-01-08
|
|
||||||
|
|
||||||
### Added
|
|
||||||
|
|
||||||
* Add `into_service` helper function
|
|
||||||
|
|
||||||
|
|
||||||
## [1.0.1] - 2019-12-22
|
## 1.0.2 - 2020-01-08
|
||||||
|
* Add `into_service` helper function.
|
||||||
### Changed
|
|
||||||
|
|
||||||
* `map_config()` and `unit_config()` accepts `IntoServiceFactory` type
|
|
||||||
|
|
||||||
|
|
||||||
## [1.0.0] - 2019-12-11
|
## 1.0.1 - 2019-12-22
|
||||||
|
* `map_config()` and `unit_config()` now accept `IntoServiceFactory` type.
|
||||||
|
|
||||||
### Added
|
|
||||||
|
|
||||||
|
## 1.0.0 - 2019-12-11
|
||||||
* Add Clone impl for Apply service
|
* Add Clone impl for Apply service
|
||||||
|
|
||||||
|
|
||||||
## [1.0.0-alpha.4] - 2019-12-08
|
## 1.0.0-alpha.4 - 2019-12-08
|
||||||
|
|
||||||
### Changed
|
|
||||||
|
|
||||||
* Renamed `service_fn` to `fn_service`
|
* Renamed `service_fn` to `fn_service`
|
||||||
|
|
||||||
* Renamed `factory_fn` to `fn_factory`
|
* Renamed `factory_fn` to `fn_factory`
|
||||||
|
|
||||||
* Renamed `factory_fn_cfg` to `fn_factory_with_config`
|
* Renamed `factory_fn_cfg` to `fn_factory_with_config`
|
||||||
|
|
||||||
|
|
||||||
## [1.0.0-alpha.3] - 2019-12-06
|
## 1.0.0-alpha.3 - 2019-12-06
|
||||||
|
|
||||||
### Changed
|
|
||||||
|
|
||||||
* Add missing Clone impls
|
* Add missing Clone impls
|
||||||
|
|
||||||
* Restore `Transform::map_init_err()` combinator
|
* Restore `Transform::map_init_err()` combinator
|
||||||
|
|
||||||
* Restore `Service/Factory::apply_fn()` in form of `Pipeline/Factory::and_then_apply_fn()`
|
* Restore `Service/Factory::apply_fn()` in form of `Pipeline/Factory::and_then_apply_fn()`
|
||||||
|
|
||||||
* Optimize service combinators and futures memory layout
|
* Optimize service combinators and futures memory layout
|
||||||
|
|
||||||
|
|
||||||
## [1.0.0-alpha.2] - 2019-12-02
|
## 1.0.0-alpha.2 - 2019-12-02
|
||||||
|
|
||||||
### Changed
|
|
||||||
|
|
||||||
* Use owned config value for service factory
|
* Use owned config value for service factory
|
||||||
|
|
||||||
* Renamed BoxedNewService/BoxedService to BoxServiceFactory/BoxService
|
* Renamed BoxedNewService/BoxedService to BoxServiceFactory/BoxService
|
||||||
|
|
||||||
|
|
||||||
## [1.0.0-alpha.1] - 2019-11-25
|
## 1.0.0-alpha.1 - 2019-11-25
|
||||||
|
* Migrated to `std::future`
|
||||||
### Changed
|
|
||||||
|
|
||||||
* Migraded to `std::future`
|
|
||||||
|
|
||||||
* `NewService` renamed to `ServiceFactory`
|
* `NewService` renamed to `ServiceFactory`
|
||||||
|
|
||||||
* Added `pipeline` and `pipeline_factory` function
|
* Added `pipeline` and `pipeline_factory` function
|
||||||
|
|
||||||
|
|
||||||
## [0.4.2] - 2019-08-27
|
## 0.4.2 - 2019-08-27
|
||||||
|
|
||||||
### Fixed
|
|
||||||
|
|
||||||
* Check service readiness for `new_apply_cfg` combinator
|
* Check service readiness for `new_apply_cfg` combinator
|
||||||
|
|
||||||
|
|
||||||
## [0.4.1] - 2019-06-06
|
## 0.4.1 - 2019-06-06
|
||||||
|
|
||||||
### Added
|
|
||||||
|
|
||||||
* Add `new_apply_cfg` function
|
* Add `new_apply_cfg` function
|
||||||
|
|
||||||
## [0.4.0] - 2019-05-12
|
|
||||||
|
|
||||||
### Changed
|
## 0.4.0 - 2019-05-12
|
||||||
|
* Add `NewService::map_config` and `NewService::unit_config` combinators.
|
||||||
* Use associated type for `NewService` config
|
* Use associated type for `NewService` config.
|
||||||
|
* Change `apply_cfg` function.
|
||||||
* Change `apply_cfg` function
|
* Renamed helper functions.
|
||||||
|
|
||||||
* Renamed helper functions
|
|
||||||
|
|
||||||
### Added
|
|
||||||
|
|
||||||
* Add `NewService::map_config` and `NewService::unit_config` combinators
|
|
||||||
|
|
||||||
|
|
||||||
## [0.3.6] - 2019-04-07
|
## 0.3.6 - 2019-04-07
|
||||||
|
|
||||||
### Changed
|
|
||||||
|
|
||||||
* Poll boxed service call result immediately
|
* Poll boxed service call result immediately
|
||||||
|
|
||||||
|
|
||||||
## [0.3.5] - 2019-03-29
|
## 0.3.5 - 2019-03-29
|
||||||
|
* Add `impl<S: Service> Service for Rc<RefCell<S>>`.
|
||||||
### Added
|
|
||||||
|
|
||||||
* Add `impl<S: Service> Service for Rc<RefCell<S>>`
|
|
||||||
|
|
||||||
|
|
||||||
## [0.3.4] - 2019-03-12
|
## 0.3.4 - 2019-03-12
|
||||||
|
|
||||||
### Added
|
|
||||||
|
|
||||||
* Add `Transform::from_err()` combinator
|
* Add `Transform::from_err()` combinator
|
||||||
|
|
||||||
* Add `apply_fn` helper
|
* Add `apply_fn` helper
|
||||||
|
|
||||||
* Add `apply_fn_factory` helper
|
* Add `apply_fn_factory` helper
|
||||||
|
|
||||||
* Add `apply_transform` helper
|
* Add `apply_transform` helper
|
||||||
|
|
||||||
* Add `apply_cfg` helper
|
* Add `apply_cfg` helper
|
||||||
|
|
||||||
|
|
||||||
## [0.3.3] - 2019-03-09
|
## 0.3.3 - 2019-03-09
|
||||||
|
|
||||||
### Added
|
|
||||||
|
|
||||||
* Add `ApplyTransform` new service for transform and new service.
|
* Add `ApplyTransform` new service for transform and new service.
|
||||||
|
* Add `NewService::apply_cfg()` combinator, allows to use nested `NewService` with different config parameter.
|
||||||
* Add `NewService::apply_cfg()` combinator, allows to use
|
|
||||||
nested `NewService` with different config parameter.
|
|
||||||
|
|
||||||
### Changed
|
|
||||||
|
|
||||||
* Revert IntoFuture change
|
* Revert IntoFuture change
|
||||||
|
|
||||||
|
|
||||||
## [0.3.2] - 2019-03-04
|
## 0.3.2 - 2019-03-04
|
||||||
|
|
||||||
### Changed
|
|
||||||
|
|
||||||
* Change `NewService::Future` and `Transform::Future` to the `IntoFuture` trait.
|
* Change `NewService::Future` and `Transform::Future` to the `IntoFuture` trait.
|
||||||
|
|
||||||
* Export `AndThenTransform` type
|
* Export `AndThenTransform` type
|
||||||
|
|
||||||
|
|
||||||
## [0.3.1] - 2019-03-04
|
## 0.3.1 - 2019-03-04
|
||||||
|
|
||||||
### Changed
|
|
||||||
|
|
||||||
* Simplify Transform trait
|
* Simplify Transform trait
|
||||||
|
|
||||||
|
|
||||||
## [0.3.0] - 2019-03-02
|
## 0.3.0 - 2019-03-02
|
||||||
|
|
||||||
## Added
|
|
||||||
|
|
||||||
* Added boxed NewService and Service.
|
* Added boxed NewService and Service.
|
||||||
|
|
||||||
## Changed
|
|
||||||
|
|
||||||
* Added `Config` parameter to `NewService` trait.
|
* Added `Config` parameter to `NewService` trait.
|
||||||
|
|
||||||
* Added `Config` parameter to `NewTransform` trait.
|
* Added `Config` parameter to `NewTransform` trait.
|
||||||
|
|
||||||
|
|
||||||
## [0.2.2] - 2019-02-19
|
## 0.2.2 - 2019-02-19
|
||||||
|
|
||||||
### Added
|
|
||||||
|
|
||||||
* Added `NewService` impl for `Rc<S> where S: NewService`
|
* Added `NewService` impl for `Rc<S> where S: NewService`
|
||||||
|
|
||||||
* Added `NewService` impl for `Arc<S> where S: NewService`
|
* Added `NewService` impl for `Arc<S> where S: NewService`
|
||||||
|
|
||||||
|
|
||||||
## [0.2.1] - 2019-02-03
|
## 0.2.1 - 2019-02-03
|
||||||
|
|
||||||
### Changed
|
|
||||||
|
|
||||||
* Generalize `.apply` combinator with Transform trait
|
* Generalize `.apply` combinator with Transform trait
|
||||||
|
|
||||||
|
|
||||||
## [0.2.0] - 2019-02-01
|
## 0.2.0 - 2019-02-01
|
||||||
|
|
||||||
### Changed
|
|
||||||
|
|
||||||
* Use associated type instead of generic for Service definition.
|
* Use associated type instead of generic for Service definition.
|
||||||
|
|
||||||
* Before:
|
* Before:
|
||||||
|
|
||||||
```rust
|
```rust
|
||||||
impl Service<Request> for Client {
|
impl Service<Request> for Client {
|
||||||
type Response = Response;
|
type Response = Response;
|
||||||
@@ -278,7 +181,6 @@
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
* After:
|
* After:
|
||||||
|
|
||||||
```rust
|
```rust
|
||||||
impl Service for Client {
|
impl Service for Client {
|
||||||
type Request = Request;
|
type Request = Request;
|
||||||
@@ -288,50 +190,30 @@
|
|||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
## [0.1.6] - 2019-01-24
|
## 0.1.6 - 2019-01-24
|
||||||
|
|
||||||
### Changed
|
|
||||||
|
|
||||||
* Use `FnMut` instead of `Fn` for .apply() and .map() combinators and `FnService` type
|
* Use `FnMut` instead of `Fn` for .apply() and .map() combinators and `FnService` type
|
||||||
|
|
||||||
* Change `.apply()` error semantic, new service's error is `From<Self::Error>`
|
* Change `.apply()` error semantic, new service's error is `From<Self::Error>`
|
||||||
|
|
||||||
|
|
||||||
## [0.1.5] - 2019-01-13
|
## 0.1.5 - 2019-01-13
|
||||||
|
* Make `Out::Error` convertible from `T::Error` for apply combinator
|
||||||
### Changed
|
|
||||||
|
|
||||||
* Make `Out::Error` convertable from `T::Error` for apply combinator
|
|
||||||
|
|
||||||
|
|
||||||
## [0.1.4] - 2019-01-11
|
## 0.1.4 - 2019-01-11
|
||||||
|
|
||||||
### Changed
|
|
||||||
|
|
||||||
* Use `FnMut` instead of `Fn` for `FnService`
|
* Use `FnMut` instead of `Fn` for `FnService`
|
||||||
|
|
||||||
|
|
||||||
## [0.1.3] - 2018-12-12
|
## 0.1.3 - 2018-12-12
|
||||||
|
|
||||||
### Changed
|
|
||||||
|
|
||||||
* Split service combinators to separate trait
|
* Split service combinators to separate trait
|
||||||
|
|
||||||
|
|
||||||
## [0.1.2] - 2018-12-12
|
## 0.1.2 - 2018-12-12
|
||||||
|
|
||||||
### Fixed
|
|
||||||
|
|
||||||
* Release future early for `.and_then()` and `.then()` combinators
|
* Release future early for `.and_then()` and `.then()` combinators
|
||||||
|
|
||||||
|
|
||||||
## [0.1.1] - 2018-12-09
|
## 0.1.1 - 2018-12-09
|
||||||
|
* Added Service impl for `Box<S: Service>`
|
||||||
### Added
|
|
||||||
|
|
||||||
* Added Service impl for Box<S: Service>
|
|
||||||
|
|
||||||
|
|
||||||
## [0.1.0] - 2018-12-09
|
## 0.1.0 - 2018-12-09
|
||||||
|
|
||||||
* Initial import
|
* Initial import
|
||||||
|
@@ -1,7 +1,7 @@
|
|||||||
/// An implementation of [`poll_ready`]() that always signals readiness.
|
/// An implementation of [`poll_ready`]() that always signals readiness.
|
||||||
///
|
///
|
||||||
/// This should only be used for basic leaf services that have no concept of un-readiness.
|
/// This should only be used for basic leaf services that have no concept of un-readiness.
|
||||||
/// For wrapper or other serivice types, use [`forward_ready!`] for simple cases or write a bespoke
|
/// For wrapper or other service types, use [`forward_ready!`] for simple cases or write a bespoke
|
||||||
/// `poll_ready` implementation.
|
/// `poll_ready` implementation.
|
||||||
///
|
///
|
||||||
/// [`poll_ready`]: crate::Service::poll_ready
|
/// [`poll_ready`]: crate::Service::poll_ready
|
||||||
|
@@ -3,6 +3,22 @@
|
|||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
|
|
||||||
|
|
||||||
|
## 3.0.0-beta.9 - 2021-11-22
|
||||||
|
* Add configurable timeout for accepting TLS connection. [#393]
|
||||||
|
* Added `TlsError::Timeout` variant. [#393]
|
||||||
|
* All TLS acceptor services now use `TlsError` for their error types. [#393]
|
||||||
|
* Added `TlsError::into_service_error`. [#420]
|
||||||
|
|
||||||
|
[#393]: https://github.com/actix/actix-net/pull/393
|
||||||
|
[#420]: https://github.com/actix/actix-net/pull/420
|
||||||
|
|
||||||
|
|
||||||
|
## 3.0.0-beta.8 - 2021-11-15
|
||||||
|
* Add `Connect::request` for getting a reference to the connection request. [#415]
|
||||||
|
|
||||||
|
[#415]: https://github.com/actix/actix-net/pull/415
|
||||||
|
|
||||||
|
|
||||||
## 3.0.0-beta.7 - 2021-10-20
|
## 3.0.0-beta.7 - 2021-10-20
|
||||||
* Add `webpki_roots_cert_store()` to get rustls compatible webpki roots cert store. [#401]
|
* Add `webpki_roots_cert_store()` to get rustls compatible webpki roots cert store. [#401]
|
||||||
* Alias `connect::ssl` to `connect::tls`. [#401]
|
* Alias `connect::ssl` to `connect::tls`. [#401]
|
||||||
|
@@ -1,7 +1,10 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-tls"
|
name = "actix-tls"
|
||||||
version = "3.0.0-beta.7"
|
version = "3.0.0-beta.9"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = [
|
||||||
|
"Nikolay Kim <fafhrd91@gmail.com>",
|
||||||
|
"Rob Ede <robjtede@icloud.com>",
|
||||||
|
]
|
||||||
description = "TLS acceptor and connector services for Actix ecosystem"
|
description = "TLS acceptor and connector services for Actix ecosystem"
|
||||||
keywords = ["network", "tls", "ssl", "async", "transport"]
|
keywords = ["network", "tls", "ssl", "async", "transport"]
|
||||||
repository = "https://github.com/actix/actix-net.git"
|
repository = "https://github.com/actix/actix-net.git"
|
||||||
@@ -47,6 +50,7 @@ derive_more = "0.99.5"
|
|||||||
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
|
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
|
||||||
http = { version = "0.2.3", optional = true }
|
http = { version = "0.2.3", optional = true }
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
|
pin-project-lite = "0.2.7"
|
||||||
tokio-util = { version = "0.6.3", default-features = false }
|
tokio-util = { version = "0.6.3", default-features = false }
|
||||||
|
|
||||||
# openssl
|
# openssl
|
||||||
@@ -62,12 +66,14 @@ tokio-native-tls = { version = "0.3", optional = true }
|
|||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-rt = "2.2.0"
|
actix-rt = "2.2.0"
|
||||||
actix-server = "2.0.0-beta.6"
|
actix-server = "2.0.0-beta.9"
|
||||||
bytes = "1"
|
bytes = "1"
|
||||||
env_logger = "0.9"
|
env_logger = "0.9"
|
||||||
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
|
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
|
rcgen = "0.8"
|
||||||
rustls-pemfile = "0.2.1"
|
rustls-pemfile = "0.2.1"
|
||||||
|
tokio-rustls = { version = "0.23", features = ["dangerous_configuration"] }
|
||||||
trust-dns-resolver = "0.20.0"
|
trust-dns-resolver = "0.20.0"
|
||||||
|
|
||||||
[[example]]
|
[[example]]
|
||||||
|
@@ -1,11 +1,9 @@
|
|||||||
//! TLS acceptor services for Actix ecosystem.
|
//! TLS acceptor services.
|
||||||
//!
|
|
||||||
//! ## Crate Features
|
|
||||||
//! * `openssl` - TLS acceptor using the `openssl` crate.
|
|
||||||
//! * `rustls` - TLS acceptor using the `rustls` crate.
|
|
||||||
//! * `native-tls` - TLS acceptor using the `native-tls` crate.
|
|
||||||
|
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::{
|
||||||
|
convert::Infallible,
|
||||||
|
sync::atomic::{AtomicUsize, Ordering},
|
||||||
|
};
|
||||||
|
|
||||||
use actix_utils::counter::Counter;
|
use actix_utils::counter::Counter;
|
||||||
|
|
||||||
@@ -20,6 +18,10 @@ pub mod native_tls;
|
|||||||
|
|
||||||
pub(crate) static MAX_CONN: AtomicUsize = AtomicUsize::new(256);
|
pub(crate) static MAX_CONN: AtomicUsize = AtomicUsize::new(256);
|
||||||
|
|
||||||
|
#[cfg(any(feature = "openssl", feature = "rustls", feature = "native-tls"))]
|
||||||
|
pub(crate) const DEFAULT_TLS_HANDSHAKE_TIMEOUT: std::time::Duration =
|
||||||
|
std::time::Duration::from_secs(3);
|
||||||
|
|
||||||
thread_local! {
|
thread_local! {
|
||||||
static MAX_CONN_COUNTER: Counter = Counter::new(MAX_CONN.load(Ordering::Relaxed));
|
static MAX_CONN_COUNTER: Counter = Counter::new(MAX_CONN.load(Ordering::Relaxed));
|
||||||
}
|
}
|
||||||
@@ -36,7 +38,30 @@ pub fn max_concurrent_tls_connect(num: usize) {
|
|||||||
|
|
||||||
/// TLS error combined with service error.
|
/// TLS error combined with service error.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum TlsError<E1, E2> {
|
pub enum TlsError<TlsErr, SvcErr> {
|
||||||
Tls(E1),
|
Timeout,
|
||||||
Service(E2),
|
Tls(TlsErr),
|
||||||
|
Service(SvcErr),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TlsErr> TlsError<TlsErr, Infallible> {
|
||||||
|
/// Casts the infallible service error type returned from acceptors into caller's type.
|
||||||
|
pub fn into_service_error<SvcErr>(self) -> TlsError<TlsErr, SvcErr> {
|
||||||
|
match self {
|
||||||
|
Self::Timeout => TlsError::Timeout,
|
||||||
|
Self::Tls(err) => TlsError::Tls(err),
|
||||||
|
Self::Service(_) => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn tls_service_error_inference() {
|
||||||
|
let a: TlsError<u32, Infallible> = TlsError::Tls(42);
|
||||||
|
let _b: TlsError<u32, u64> = a.into_service_error();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -1,20 +1,24 @@
|
|||||||
use std::{
|
use std::{
|
||||||
|
convert::Infallible,
|
||||||
io::{self, IoSlice},
|
io::{self, IoSlice},
|
||||||
ops::{Deref, DerefMut},
|
ops::{Deref, DerefMut},
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
|
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
|
||||||
use actix_rt::net::{ActixStream, Ready};
|
use actix_rt::{
|
||||||
|
net::{ActixStream, Ready},
|
||||||
|
time::timeout,
|
||||||
|
};
|
||||||
use actix_service::{Service, ServiceFactory};
|
use actix_service::{Service, ServiceFactory};
|
||||||
use actix_utils::counter::Counter;
|
use actix_utils::counter::Counter;
|
||||||
use futures_core::future::LocalBoxFuture;
|
use futures_core::future::LocalBoxFuture;
|
||||||
|
|
||||||
pub use tokio_native_tls::native_tls::Error;
|
pub use tokio_native_tls::{native_tls::Error, TlsAcceptor};
|
||||||
pub use tokio_native_tls::TlsAcceptor;
|
|
||||||
|
|
||||||
use super::MAX_CONN_COUNTER;
|
use super::{TlsError, DEFAULT_TLS_HANDSHAKE_TIMEOUT, MAX_CONN_COUNTER};
|
||||||
|
|
||||||
/// Wrapper type for `tokio_native_tls::TlsStream` in order to impl `ActixStream` trait.
|
/// Wrapper type for `tokio_native_tls::TlsStream` in order to impl `ActixStream` trait.
|
||||||
pub struct TlsStream<T>(tokio_native_tls::TlsStream<T>);
|
pub struct TlsStream<T>(tokio_native_tls::TlsStream<T>);
|
||||||
@@ -94,13 +98,25 @@ impl<T: ActixStream> ActixStream for TlsStream<T> {
|
|||||||
/// `native-tls` feature enables this `Acceptor` type.
|
/// `native-tls` feature enables this `Acceptor` type.
|
||||||
pub struct Acceptor {
|
pub struct Acceptor {
|
||||||
acceptor: TlsAcceptor,
|
acceptor: TlsAcceptor,
|
||||||
|
handshake_timeout: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Acceptor {
|
impl Acceptor {
|
||||||
/// Create `native-tls` based `Acceptor` service factory.
|
/// Create `native-tls` based `Acceptor` service factory.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn new(acceptor: TlsAcceptor) -> Self {
|
pub fn new(acceptor: TlsAcceptor) -> Self {
|
||||||
Acceptor { acceptor }
|
Acceptor {
|
||||||
|
acceptor,
|
||||||
|
handshake_timeout: DEFAULT_TLS_HANDSHAKE_TIMEOUT,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Limit the amount of time that the acceptor will wait for a TLS handshake to complete.
|
||||||
|
///
|
||||||
|
/// Default timeout is 3 seconds.
|
||||||
|
pub fn set_handshake_timeout(&mut self, handshake_timeout: Duration) -> &mut Self {
|
||||||
|
self.handshake_timeout = handshake_timeout;
|
||||||
|
self
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -109,15 +125,15 @@ impl Clone for Acceptor {
|
|||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
Self {
|
Self {
|
||||||
acceptor: self.acceptor.clone(),
|
acceptor: self.acceptor.clone(),
|
||||||
|
handshake_timeout: self.handshake_timeout,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: ActixStream + 'static> ServiceFactory<T> for Acceptor {
|
impl<T: ActixStream + 'static> ServiceFactory<T> for Acceptor {
|
||||||
type Response = TlsStream<T>;
|
type Response = TlsStream<T>;
|
||||||
type Error = Error;
|
type Error = TlsError<Error, Infallible>;
|
||||||
type Config = ();
|
type Config = ();
|
||||||
|
|
||||||
type Service = NativeTlsAcceptorService;
|
type Service = NativeTlsAcceptorService;
|
||||||
type InitError = ();
|
type InitError = ();
|
||||||
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
|
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
|
||||||
@@ -127,8 +143,10 @@ impl<T: ActixStream + 'static> ServiceFactory<T> for Acceptor {
|
|||||||
Ok(NativeTlsAcceptorService {
|
Ok(NativeTlsAcceptorService {
|
||||||
acceptor: self.acceptor.clone(),
|
acceptor: self.acceptor.clone(),
|
||||||
conns: conns.clone(),
|
conns: conns.clone(),
|
||||||
|
handshake_timeout: self.handshake_timeout,
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
Box::pin(async { res })
|
Box::pin(async { res })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -136,12 +154,13 @@ impl<T: ActixStream + 'static> ServiceFactory<T> for Acceptor {
|
|||||||
pub struct NativeTlsAcceptorService {
|
pub struct NativeTlsAcceptorService {
|
||||||
acceptor: TlsAcceptor,
|
acceptor: TlsAcceptor,
|
||||||
conns: Counter,
|
conns: Counter,
|
||||||
|
handshake_timeout: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: ActixStream + 'static> Service<T> for NativeTlsAcceptorService {
|
impl<T: ActixStream + 'static> Service<T> for NativeTlsAcceptorService {
|
||||||
type Response = TlsStream<T>;
|
type Response = TlsStream<T>;
|
||||||
type Error = Error;
|
type Error = TlsError<Error, Infallible>;
|
||||||
type Future = LocalBoxFuture<'static, Result<TlsStream<T>, Error>>;
|
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||||
|
|
||||||
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
if self.conns.available(cx) {
|
if self.conns.available(cx) {
|
||||||
@@ -154,10 +173,18 @@ impl<T: ActixStream + 'static> Service<T> for NativeTlsAcceptorService {
|
|||||||
fn call(&self, io: T) -> Self::Future {
|
fn call(&self, io: T) -> Self::Future {
|
||||||
let guard = self.conns.get();
|
let guard = self.conns.get();
|
||||||
let acceptor = self.acceptor.clone();
|
let acceptor = self.acceptor.clone();
|
||||||
|
|
||||||
|
let dur = self.handshake_timeout;
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
let io = acceptor.accept(io).await;
|
match timeout(dur, acceptor.accept(io)).await {
|
||||||
|
Ok(Ok(io)) => {
|
||||||
drop(guard);
|
drop(guard);
|
||||||
io.map(Into::into)
|
Ok(TlsStream(io))
|
||||||
|
}
|
||||||
|
Ok(Err(err)) => Err(TlsError::Tls(err)),
|
||||||
|
Err(_timeout) => Err(TlsError::Timeout),
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -1,22 +1,28 @@
|
|||||||
use std::{
|
use std::{
|
||||||
|
convert::Infallible,
|
||||||
future::Future,
|
future::Future,
|
||||||
io::{self, IoSlice},
|
io::{self, IoSlice},
|
||||||
ops::{Deref, DerefMut},
|
ops::{Deref, DerefMut},
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
|
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
|
||||||
use actix_rt::net::{ActixStream, Ready};
|
use actix_rt::{
|
||||||
|
net::{ActixStream, Ready},
|
||||||
|
time::{sleep, Sleep},
|
||||||
|
};
|
||||||
use actix_service::{Service, ServiceFactory};
|
use actix_service::{Service, ServiceFactory};
|
||||||
use actix_utils::counter::{Counter, CounterGuard};
|
use actix_utils::counter::{Counter, CounterGuard};
|
||||||
use futures_core::{future::LocalBoxFuture, ready};
|
use futures_core::future::LocalBoxFuture;
|
||||||
|
|
||||||
pub use openssl::ssl::{
|
pub use openssl::ssl::{
|
||||||
AlpnError, Error as SslError, HandshakeError, Ssl, SslAcceptor, SslAcceptorBuilder,
|
AlpnError, Error as SslError, HandshakeError, Ssl, SslAcceptor, SslAcceptorBuilder,
|
||||||
};
|
};
|
||||||
|
use pin_project_lite::pin_project;
|
||||||
|
|
||||||
use super::MAX_CONN_COUNTER;
|
use super::{TlsError, DEFAULT_TLS_HANDSHAKE_TIMEOUT, MAX_CONN_COUNTER};
|
||||||
|
|
||||||
/// Wrapper type for `tokio_openssl::SslStream` in order to impl `ActixStream` trait.
|
/// Wrapper type for `tokio_openssl::SslStream` in order to impl `ActixStream` trait.
|
||||||
pub struct TlsStream<T>(tokio_openssl::SslStream<T>);
|
pub struct TlsStream<T>(tokio_openssl::SslStream<T>);
|
||||||
@@ -96,13 +102,25 @@ impl<T: ActixStream> ActixStream for TlsStream<T> {
|
|||||||
/// `openssl` feature enables this `Acceptor` type.
|
/// `openssl` feature enables this `Acceptor` type.
|
||||||
pub struct Acceptor {
|
pub struct Acceptor {
|
||||||
acceptor: SslAcceptor,
|
acceptor: SslAcceptor,
|
||||||
|
handshake_timeout: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Acceptor {
|
impl Acceptor {
|
||||||
/// Create OpenSSL based `Acceptor` service factory.
|
/// Create OpenSSL based `Acceptor` service factory.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn new(acceptor: SslAcceptor) -> Self {
|
pub fn new(acceptor: SslAcceptor) -> Self {
|
||||||
Acceptor { acceptor }
|
Acceptor {
|
||||||
|
acceptor,
|
||||||
|
handshake_timeout: DEFAULT_TLS_HANDSHAKE_TIMEOUT,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Limit the amount of time that the acceptor will wait for a TLS handshake to complete.
|
||||||
|
///
|
||||||
|
/// Default timeout is 3 seconds.
|
||||||
|
pub fn set_handshake_timeout(&mut self, handshake_timeout: Duration) -> &mut Self {
|
||||||
|
self.handshake_timeout = handshake_timeout;
|
||||||
|
self
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -111,13 +129,14 @@ impl Clone for Acceptor {
|
|||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
Self {
|
Self {
|
||||||
acceptor: self.acceptor.clone(),
|
acceptor: self.acceptor.clone(),
|
||||||
|
handshake_timeout: self.handshake_timeout,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: ActixStream> ServiceFactory<T> for Acceptor {
|
impl<T: ActixStream> ServiceFactory<T> for Acceptor {
|
||||||
type Response = TlsStream<T>;
|
type Response = TlsStream<T>;
|
||||||
type Error = SslError;
|
type Error = TlsError<SslError, Infallible>;
|
||||||
type Config = ();
|
type Config = ();
|
||||||
type Service = AcceptorService;
|
type Service = AcceptorService;
|
||||||
type InitError = ();
|
type InitError = ();
|
||||||
@@ -128,8 +147,10 @@ impl<T: ActixStream> ServiceFactory<T> for Acceptor {
|
|||||||
Ok(AcceptorService {
|
Ok(AcceptorService {
|
||||||
acceptor: self.acceptor.clone(),
|
acceptor: self.acceptor.clone(),
|
||||||
conns: conns.clone(),
|
conns: conns.clone(),
|
||||||
|
handshake_timeout: self.handshake_timeout,
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
Box::pin(async { res })
|
Box::pin(async { res })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -137,11 +158,12 @@ impl<T: ActixStream> ServiceFactory<T> for Acceptor {
|
|||||||
pub struct AcceptorService {
|
pub struct AcceptorService {
|
||||||
acceptor: SslAcceptor,
|
acceptor: SslAcceptor,
|
||||||
conns: Counter,
|
conns: Counter,
|
||||||
|
handshake_timeout: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: ActixStream> Service<T> for AcceptorService {
|
impl<T: ActixStream> Service<T> for AcceptorService {
|
||||||
type Response = TlsStream<T>;
|
type Response = TlsStream<T>;
|
||||||
type Error = SslError;
|
type Error = TlsError<SslError, Infallible>;
|
||||||
type Future = AcceptorServiceResponse<T>;
|
type Future = AcceptorServiceResponse<T>;
|
||||||
|
|
||||||
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
@@ -155,27 +177,38 @@ impl<T: ActixStream> Service<T> for AcceptorService {
|
|||||||
fn call(&self, io: T) -> Self::Future {
|
fn call(&self, io: T) -> Self::Future {
|
||||||
let ssl_ctx = self.acceptor.context();
|
let ssl_ctx = self.acceptor.context();
|
||||||
let ssl = Ssl::new(ssl_ctx).expect("Provided SSL acceptor was invalid.");
|
let ssl = Ssl::new(ssl_ctx).expect("Provided SSL acceptor was invalid.");
|
||||||
|
|
||||||
AcceptorServiceResponse {
|
AcceptorServiceResponse {
|
||||||
_guard: self.conns.get(),
|
_guard: self.conns.get(),
|
||||||
|
timeout: sleep(self.handshake_timeout),
|
||||||
stream: Some(tokio_openssl::SslStream::new(ssl, io).unwrap()),
|
stream: Some(tokio_openssl::SslStream::new(ssl, io).unwrap()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct AcceptorServiceResponse<T: ActixStream> {
|
pin_project! {
|
||||||
|
pub struct AcceptorServiceResponse<T: ActixStream> {
|
||||||
stream: Option<tokio_openssl::SslStream<T>>,
|
stream: Option<tokio_openssl::SslStream<T>>,
|
||||||
|
#[pin]
|
||||||
|
timeout: Sleep,
|
||||||
_guard: CounterGuard,
|
_guard: CounterGuard,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: ActixStream> Future for AcceptorServiceResponse<T> {
|
impl<T: ActixStream> Future for AcceptorServiceResponse<T> {
|
||||||
type Output = Result<TlsStream<T>, SslError>;
|
type Output = Result<TlsStream<T>, TlsError<SslError, Infallible>>;
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
ready!(Pin::new(self.stream.as_mut().unwrap()).poll_accept(cx))?;
|
let this = self.project();
|
||||||
Poll::Ready(Ok(self
|
|
||||||
|
match Pin::new(this.stream.as_mut().unwrap()).poll_accept(cx) {
|
||||||
|
Poll::Ready(Ok(())) => Poll::Ready(Ok(this
|
||||||
.stream
|
.stream
|
||||||
.take()
|
.take()
|
||||||
.expect("SSL connect has resolved.")
|
.expect("Acceptor should not be polled after it has completed.")
|
||||||
.into()))
|
.into())),
|
||||||
|
Poll::Ready(Err(err)) => Poll::Ready(Err(TlsError::Tls(err))),
|
||||||
|
Poll::Pending => this.timeout.poll(cx).map(|_| Err(TlsError::Timeout)),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -1,22 +1,28 @@
|
|||||||
use std::{
|
use std::{
|
||||||
|
convert::Infallible,
|
||||||
future::Future,
|
future::Future,
|
||||||
io::{self, IoSlice},
|
io::{self, IoSlice},
|
||||||
ops::{Deref, DerefMut},
|
ops::{Deref, DerefMut},
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
|
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
|
||||||
use actix_rt::net::{ActixStream, Ready};
|
use actix_rt::{
|
||||||
|
net::{ActixStream, Ready},
|
||||||
|
time::{sleep, Sleep},
|
||||||
|
};
|
||||||
use actix_service::{Service, ServiceFactory};
|
use actix_service::{Service, ServiceFactory};
|
||||||
use actix_utils::counter::{Counter, CounterGuard};
|
use actix_utils::counter::{Counter, CounterGuard};
|
||||||
use futures_core::future::LocalBoxFuture;
|
use futures_core::future::LocalBoxFuture;
|
||||||
|
use pin_project_lite::pin_project;
|
||||||
use tokio_rustls::{Accept, TlsAcceptor};
|
use tokio_rustls::{Accept, TlsAcceptor};
|
||||||
|
|
||||||
pub use tokio_rustls::rustls::ServerConfig;
|
pub use tokio_rustls::rustls::ServerConfig;
|
||||||
|
|
||||||
use super::MAX_CONN_COUNTER;
|
use super::{TlsError, DEFAULT_TLS_HANDSHAKE_TIMEOUT, MAX_CONN_COUNTER};
|
||||||
|
|
||||||
/// Wrapper type for `tokio_openssl::SslStream` in order to impl `ActixStream` trait.
|
/// Wrapper type for `tokio_openssl::SslStream` in order to impl `ActixStream` trait.
|
||||||
pub struct TlsStream<T>(tokio_rustls::server::TlsStream<T>);
|
pub struct TlsStream<T>(tokio_rustls::server::TlsStream<T>);
|
||||||
@@ -96,6 +102,7 @@ impl<T: ActixStream> ActixStream for TlsStream<T> {
|
|||||||
/// `rustls` feature enables this `Acceptor` type.
|
/// `rustls` feature enables this `Acceptor` type.
|
||||||
pub struct Acceptor {
|
pub struct Acceptor {
|
||||||
config: Arc<ServerConfig>,
|
config: Arc<ServerConfig>,
|
||||||
|
handshake_timeout: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Acceptor {
|
impl Acceptor {
|
||||||
@@ -104,8 +111,17 @@ impl Acceptor {
|
|||||||
pub fn new(config: ServerConfig) -> Self {
|
pub fn new(config: ServerConfig) -> Self {
|
||||||
Acceptor {
|
Acceptor {
|
||||||
config: Arc::new(config),
|
config: Arc::new(config),
|
||||||
|
handshake_timeout: DEFAULT_TLS_HANDSHAKE_TIMEOUT,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Limit the amount of time that the acceptor will wait for a TLS handshake to complete.
|
||||||
|
///
|
||||||
|
/// Default timeout is 3 seconds.
|
||||||
|
pub fn set_handshake_timeout(&mut self, handshake_timeout: Duration) -> &mut Self {
|
||||||
|
self.handshake_timeout = handshake_timeout;
|
||||||
|
self
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Clone for Acceptor {
|
impl Clone for Acceptor {
|
||||||
@@ -113,15 +129,15 @@ impl Clone for Acceptor {
|
|||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
Self {
|
Self {
|
||||||
config: self.config.clone(),
|
config: self.config.clone(),
|
||||||
|
handshake_timeout: self.handshake_timeout,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: ActixStream> ServiceFactory<T> for Acceptor {
|
impl<T: ActixStream> ServiceFactory<T> for Acceptor {
|
||||||
type Response = TlsStream<T>;
|
type Response = TlsStream<T>;
|
||||||
type Error = io::Error;
|
type Error = TlsError<io::Error, Infallible>;
|
||||||
type Config = ();
|
type Config = ();
|
||||||
|
|
||||||
type Service = AcceptorService;
|
type Service = AcceptorService;
|
||||||
type InitError = ();
|
type InitError = ();
|
||||||
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
|
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
|
||||||
@@ -131,8 +147,10 @@ impl<T: ActixStream> ServiceFactory<T> for Acceptor {
|
|||||||
Ok(AcceptorService {
|
Ok(AcceptorService {
|
||||||
acceptor: self.config.clone().into(),
|
acceptor: self.config.clone().into(),
|
||||||
conns: conns.clone(),
|
conns: conns.clone(),
|
||||||
|
handshake_timeout: self.handshake_timeout,
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
Box::pin(async { res })
|
Box::pin(async { res })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -141,11 +159,12 @@ impl<T: ActixStream> ServiceFactory<T> for Acceptor {
|
|||||||
pub struct AcceptorService {
|
pub struct AcceptorService {
|
||||||
acceptor: TlsAcceptor,
|
acceptor: TlsAcceptor,
|
||||||
conns: Counter,
|
conns: Counter,
|
||||||
|
handshake_timeout: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: ActixStream> Service<T> for AcceptorService {
|
impl<T: ActixStream> Service<T> for AcceptorService {
|
||||||
type Response = TlsStream<T>;
|
type Response = TlsStream<T>;
|
||||||
type Error = io::Error;
|
type Error = TlsError<io::Error, Infallible>;
|
||||||
type Future = AcceptorServiceFut<T>;
|
type Future = AcceptorServiceFut<T>;
|
||||||
|
|
||||||
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
@@ -158,22 +177,31 @@ impl<T: ActixStream> Service<T> for AcceptorService {
|
|||||||
|
|
||||||
fn call(&self, req: T) -> Self::Future {
|
fn call(&self, req: T) -> Self::Future {
|
||||||
AcceptorServiceFut {
|
AcceptorServiceFut {
|
||||||
_guard: self.conns.get(),
|
|
||||||
fut: self.acceptor.accept(req),
|
fut: self.acceptor.accept(req),
|
||||||
|
timeout: sleep(self.handshake_timeout),
|
||||||
|
_guard: self.conns.get(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct AcceptorServiceFut<T: ActixStream> {
|
pin_project! {
|
||||||
|
pub struct AcceptorServiceFut<T: ActixStream> {
|
||||||
fut: Accept<T>,
|
fut: Accept<T>,
|
||||||
|
#[pin]
|
||||||
|
timeout: Sleep,
|
||||||
_guard: CounterGuard,
|
_guard: CounterGuard,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: ActixStream> Future for AcceptorServiceFut<T> {
|
impl<T: ActixStream> Future for AcceptorServiceFut<T> {
|
||||||
type Output = Result<TlsStream<T>, io::Error>;
|
type Output = Result<TlsStream<T>, TlsError<io::Error, Infallible>>;
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
let this = self.get_mut();
|
let mut this = self.project();
|
||||||
Pin::new(&mut this.fut).poll(cx).map_ok(TlsStream)
|
match Pin::new(&mut this.fut).poll(cx) {
|
||||||
|
Poll::Ready(Ok(stream)) => Poll::Ready(Ok(TlsStream(stream))),
|
||||||
|
Poll::Ready(Err(err)) => Poll::Ready(Err(TlsError::Tls(err))),
|
||||||
|
Poll::Pending => this.timeout.poll(cx).map(|_| Err(TlsError::Timeout)),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -63,16 +63,16 @@ impl From<Option<SocketAddr>> for ConnectAddrs {
|
|||||||
|
|
||||||
/// Connection info.
|
/// Connection info.
|
||||||
#[derive(Debug, PartialEq, Eq, Hash)]
|
#[derive(Debug, PartialEq, Eq, Hash)]
|
||||||
pub struct Connect<T> {
|
pub struct Connect<R> {
|
||||||
pub(crate) req: T,
|
pub(crate) req: R,
|
||||||
pub(crate) port: u16,
|
pub(crate) port: u16,
|
||||||
pub(crate) addr: ConnectAddrs,
|
pub(crate) addr: ConnectAddrs,
|
||||||
pub(crate) local_addr: Option<IpAddr>,
|
pub(crate) local_addr: Option<IpAddr>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Address> Connect<T> {
|
impl<R: Address> Connect<R> {
|
||||||
/// Create `Connect` instance by splitting the string by ':' and convert the second part to u16
|
/// Create `Connect` instance by splitting the string by ':' and convert the second part to u16
|
||||||
pub fn new(req: T) -> Connect<T> {
|
pub fn new(req: R) -> Connect<R> {
|
||||||
let (_, port) = parse_host(req.hostname());
|
let (_, port) = parse_host(req.hostname());
|
||||||
|
|
||||||
Connect {
|
Connect {
|
||||||
@@ -85,7 +85,7 @@ impl<T: Address> Connect<T> {
|
|||||||
|
|
||||||
/// Create new `Connect` instance from host and address. Connector skips name resolution stage
|
/// Create new `Connect` instance from host and address. Connector skips name resolution stage
|
||||||
/// for such connect messages.
|
/// for such connect messages.
|
||||||
pub fn with_addr(req: T, addr: SocketAddr) -> Connect<T> {
|
pub fn with_addr(req: R, addr: SocketAddr) -> Connect<R> {
|
||||||
Connect {
|
Connect {
|
||||||
req,
|
req,
|
||||||
port: 0,
|
port: 0,
|
||||||
@@ -155,15 +155,20 @@ impl<T: Address> Connect<T> {
|
|||||||
ConnectAddrs::Multi(addrs) => ConnectAddrsIter::MultiOwned(addrs.into_iter()),
|
ConnectAddrs::Multi(addrs) => ConnectAddrsIter::MultiOwned(addrs.into_iter()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns a reference to the connection request.
|
||||||
|
pub fn request(&self) -> &R {
|
||||||
|
&self.req
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Address> From<T> for Connect<T> {
|
impl<R: Address> From<R> for Connect<R> {
|
||||||
fn from(addr: T) -> Self {
|
fn from(addr: R) -> Self {
|
||||||
Connect::new(addr)
|
Connect::new(addr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Address> fmt::Display for Connect<T> {
|
impl<R: Address> fmt::Display for Connect<R> {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
write!(f, "{}:{}", self.hostname(), self.port())
|
write!(f, "{}:{}", self.hostname(), self.port())
|
||||||
}
|
}
|
||||||
@@ -347,4 +352,10 @@ mod tests {
|
|||||||
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
|
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn request_ref() {
|
||||||
|
let conn = Connect::new("hello");
|
||||||
|
assert_eq!(conn.request(), &"hello")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -115,7 +115,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub enum RustlsConnectorServiceFuture<T, U> {
|
pub enum RustlsConnectorServiceFuture<T, U> {
|
||||||
/// See issue https://github.com/briansmith/webpki/issues/54
|
/// See issue <https://github.com/briansmith/webpki/issues/54>
|
||||||
InvalidDns,
|
InvalidDns,
|
||||||
Future {
|
Future {
|
||||||
connect: Connect<U>,
|
connect: Connect<U>,
|
||||||
|
130
actix-tls/tests/accept-openssl.rs
Normal file
130
actix-tls/tests/accept-openssl.rs
Normal file
@@ -0,0 +1,130 @@
|
|||||||
|
//! Use Rustls connector to test OpenSSL acceptor.
|
||||||
|
|
||||||
|
#![cfg(all(
|
||||||
|
feature = "accept",
|
||||||
|
feature = "connect",
|
||||||
|
feature = "rustls",
|
||||||
|
feature = "openssl"
|
||||||
|
))]
|
||||||
|
|
||||||
|
use std::{convert::TryFrom, io::Write, sync::Arc};
|
||||||
|
|
||||||
|
use actix_rt::net::TcpStream;
|
||||||
|
use actix_server::TestServer;
|
||||||
|
use actix_service::ServiceFactoryExt as _;
|
||||||
|
use actix_tls::accept::openssl::{Acceptor, TlsStream};
|
||||||
|
use actix_utils::future::ok;
|
||||||
|
use tokio_rustls::rustls::{Certificate, ClientConfig, RootCertStore, ServerName};
|
||||||
|
|
||||||
|
fn new_cert_and_key() -> (String, String) {
|
||||||
|
let cert = rcgen::generate_simple_self_signed(vec![
|
||||||
|
"127.0.0.1".to_owned(),
|
||||||
|
"localhost".to_owned(),
|
||||||
|
])
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let key = cert.serialize_private_key_pem();
|
||||||
|
let cert = cert.serialize_pem().unwrap();
|
||||||
|
|
||||||
|
(cert, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn openssl_acceptor(cert: String, key: String) -> tls_openssl::ssl::SslAcceptor {
|
||||||
|
use tls_openssl::{
|
||||||
|
pkey::PKey,
|
||||||
|
ssl::{SslAcceptor, SslMethod},
|
||||||
|
x509::X509,
|
||||||
|
};
|
||||||
|
|
||||||
|
let cert = X509::from_pem(cert.as_bytes()).unwrap();
|
||||||
|
let key = PKey::private_key_from_pem(key.as_bytes()).unwrap();
|
||||||
|
|
||||||
|
let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
|
||||||
|
builder.set_certificate(&cert).unwrap();
|
||||||
|
builder.set_private_key(&key).unwrap();
|
||||||
|
builder.set_alpn_select_callback(|_, _protocols| Ok(b"http/1.1"));
|
||||||
|
builder.set_alpn_protos(b"\x08http/1.1").unwrap();
|
||||||
|
builder.build()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
|
mod danger {
|
||||||
|
use std::time::SystemTime;
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
use tokio_rustls::rustls::{
|
||||||
|
self,
|
||||||
|
client::{ServerCertVerified, ServerCertVerifier},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct NoCertificateVerification;
|
||||||
|
|
||||||
|
impl ServerCertVerifier for NoCertificateVerification {
|
||||||
|
fn verify_server_cert(
|
||||||
|
&self,
|
||||||
|
_end_entity: &Certificate,
|
||||||
|
_intermediates: &[Certificate],
|
||||||
|
_server_name: &ServerName,
|
||||||
|
_scts: &mut dyn Iterator<Item = &[u8]>,
|
||||||
|
_ocsp_response: &[u8],
|
||||||
|
_now: SystemTime,
|
||||||
|
) -> Result<ServerCertVerified, rustls::Error> {
|
||||||
|
Ok(ServerCertVerified::assertion())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
|
fn rustls_connector(_cert: String, _key: String) -> ClientConfig {
|
||||||
|
let mut config = ClientConfig::builder()
|
||||||
|
.with_safe_defaults()
|
||||||
|
.with_root_certificates(RootCertStore::empty())
|
||||||
|
.with_no_client_auth();
|
||||||
|
|
||||||
|
config
|
||||||
|
.dangerous()
|
||||||
|
.set_certificate_verifier(Arc::new(danger::NoCertificateVerification));
|
||||||
|
|
||||||
|
config.alpn_protocols = vec![b"http/1.1".to_vec()];
|
||||||
|
config
|
||||||
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn accepts_connections() {
|
||||||
|
let (cert, key) = new_cert_and_key();
|
||||||
|
|
||||||
|
let srv = TestServer::with({
|
||||||
|
let cert = cert.clone();
|
||||||
|
let key = key.clone();
|
||||||
|
|
||||||
|
move || {
|
||||||
|
let openssl_acceptor = openssl_acceptor(cert.clone(), key.clone());
|
||||||
|
let tls_acceptor = Acceptor::new(openssl_acceptor);
|
||||||
|
|
||||||
|
tls_acceptor
|
||||||
|
.map_err(|err| println!("OpenSSL error: {:?}", err))
|
||||||
|
.and_then(move |_stream: TlsStream<TcpStream>| ok(()))
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut sock = srv
|
||||||
|
.connect()
|
||||||
|
.expect("cannot connect to test server")
|
||||||
|
.into_std()
|
||||||
|
.unwrap();
|
||||||
|
sock.set_nonblocking(false).unwrap();
|
||||||
|
|
||||||
|
let config = rustls_connector(cert, key);
|
||||||
|
let config = Arc::new(config);
|
||||||
|
|
||||||
|
let mut conn = tokio_rustls::rustls::ClientConnection::new(
|
||||||
|
config,
|
||||||
|
ServerName::try_from("localhost").unwrap(),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let mut stream = tokio_rustls::rustls::Stream::new(&mut conn, &mut sock);
|
||||||
|
|
||||||
|
stream.flush().expect("TLS handshake failed");
|
||||||
|
}
|
104
actix-tls/tests/accept-rustls.rs
Normal file
104
actix-tls/tests/accept-rustls.rs
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
//! Use OpenSSL connector to test Rustls acceptor.
|
||||||
|
|
||||||
|
#![cfg(all(
|
||||||
|
feature = "accept",
|
||||||
|
feature = "connect",
|
||||||
|
feature = "rustls",
|
||||||
|
feature = "openssl"
|
||||||
|
))]
|
||||||
|
|
||||||
|
use std::io::{BufReader, Write};
|
||||||
|
|
||||||
|
use actix_rt::net::TcpStream;
|
||||||
|
use actix_server::TestServer;
|
||||||
|
use actix_service::ServiceFactoryExt as _;
|
||||||
|
use actix_tls::accept::rustls::{Acceptor, TlsStream};
|
||||||
|
use actix_tls::connect::tls::openssl::SslConnector;
|
||||||
|
use actix_utils::future::ok;
|
||||||
|
use rustls_pemfile::{certs, pkcs8_private_keys};
|
||||||
|
use tls_openssl::ssl::SslVerifyMode;
|
||||||
|
use tokio_rustls::rustls::{self, Certificate, PrivateKey, ServerConfig};
|
||||||
|
|
||||||
|
fn new_cert_and_key() -> (String, String) {
|
||||||
|
let cert = rcgen::generate_simple_self_signed(vec![
|
||||||
|
"127.0.0.1".to_owned(),
|
||||||
|
"localhost".to_owned(),
|
||||||
|
])
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let key = cert.serialize_private_key_pem();
|
||||||
|
let cert = cert.serialize_pem().unwrap();
|
||||||
|
|
||||||
|
(cert, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn rustls_server_config(cert: String, key: String) -> rustls::ServerConfig {
|
||||||
|
// Load TLS key and cert files
|
||||||
|
|
||||||
|
let cert = &mut BufReader::new(cert.as_bytes());
|
||||||
|
let key = &mut BufReader::new(key.as_bytes());
|
||||||
|
|
||||||
|
let cert_chain = certs(cert).unwrap().into_iter().map(Certificate).collect();
|
||||||
|
let mut keys = pkcs8_private_keys(key).unwrap();
|
||||||
|
|
||||||
|
let mut config = ServerConfig::builder()
|
||||||
|
.with_safe_defaults()
|
||||||
|
.with_no_client_auth()
|
||||||
|
.with_single_cert(cert_chain, PrivateKey(keys.remove(0)))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
config.alpn_protocols = vec![b"http/1.1".to_vec()];
|
||||||
|
|
||||||
|
config
|
||||||
|
}
|
||||||
|
|
||||||
|
fn openssl_connector(cert: String, key: String) -> SslConnector {
|
||||||
|
use actix_tls::connect::tls::openssl::{SslConnector as OpensslConnector, SslMethod};
|
||||||
|
use tls_openssl::{pkey::PKey, x509::X509};
|
||||||
|
|
||||||
|
let cert = X509::from_pem(cert.as_bytes()).unwrap();
|
||||||
|
let key = PKey::private_key_from_pem(key.as_bytes()).unwrap();
|
||||||
|
|
||||||
|
let mut ssl = OpensslConnector::builder(SslMethod::tls()).unwrap();
|
||||||
|
ssl.set_verify(SslVerifyMode::NONE);
|
||||||
|
ssl.set_certificate(&cert).unwrap();
|
||||||
|
ssl.set_private_key(&key).unwrap();
|
||||||
|
ssl.set_alpn_protos(b"\x08http/1.1").unwrap();
|
||||||
|
|
||||||
|
ssl.build()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn accepts_connections() {
|
||||||
|
let (cert, key) = new_cert_and_key();
|
||||||
|
|
||||||
|
let srv = TestServer::with({
|
||||||
|
let cert = cert.clone();
|
||||||
|
let key = key.clone();
|
||||||
|
|
||||||
|
move || {
|
||||||
|
let tls_acceptor = Acceptor::new(rustls_server_config(cert.clone(), key.clone()));
|
||||||
|
|
||||||
|
tls_acceptor
|
||||||
|
.map_err(|err| println!("Rustls error: {:?}", err))
|
||||||
|
.and_then(move |_stream: TlsStream<TcpStream>| ok(()))
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let sock = srv
|
||||||
|
.connect()
|
||||||
|
.expect("cannot connect to test server")
|
||||||
|
.into_std()
|
||||||
|
.unwrap();
|
||||||
|
sock.set_nonblocking(false).unwrap();
|
||||||
|
|
||||||
|
let connector = openssl_connector(cert, key);
|
||||||
|
|
||||||
|
let mut stream = connector
|
||||||
|
.connect("localhost", sock)
|
||||||
|
.expect("TLS handshake failed");
|
||||||
|
|
||||||
|
stream.do_handshake().expect("TLS handshake failed");
|
||||||
|
|
||||||
|
stream.flush().expect("TLS handshake failed");
|
||||||
|
}
|
@@ -23,3 +23,4 @@ local-waker = "0.1"
|
|||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-rt = "2.0.0"
|
actix-rt = "2.0.0"
|
||||||
futures-util = { version = "0.3.7", default-features = false }
|
futures-util = { version = "0.3.7", default-features = false }
|
||||||
|
static_assertions = "1.1"
|
||||||
|
@@ -26,7 +26,7 @@ impl Counter {
|
|||||||
CounterGuard::new(self.0.clone())
|
CounterGuard::new(self.0.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Notify current task and return true if counter is at capacity.
|
/// Returns true if counter is below capacity. Otherwise, register to wake task when it is.
|
||||||
pub fn available(&self, cx: &mut task::Context<'_>) -> bool {
|
pub fn available(&self, cx: &mut task::Context<'_>) -> bool {
|
||||||
self.0.available(cx)
|
self.0.available(cx)
|
||||||
}
|
}
|
||||||
|
@@ -103,10 +103,16 @@ pub fn err<T, E>(err: E) -> Ready<Result<T, E>> {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use std::rc::Rc;
|
||||||
|
|
||||||
use futures_util::task::noop_waker;
|
use futures_util::task::noop_waker;
|
||||||
|
use static_assertions::{assert_impl_all, assert_not_impl_all};
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
|
assert_impl_all!(Ready<()>: Send, Sync, Clone);
|
||||||
|
assert_not_impl_all!(Ready<Rc<()>>: Send, Sync);
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
#[should_panic]
|
#[should_panic]
|
||||||
fn multiple_poll_panics() {
|
fn multiple_poll_panics() {
|
||||||
|
Reference in New Issue
Block a user