1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-08-31 18:36:59 +02:00

Compare commits

...

22 Commits

Author SHA1 Message Date
Rob Ede
a2437eed29 prepare actix-tls release 3.0.0-beta.9 2021-11-22 13:34:54 +00:00
Rob Ede
67b357a175 add TlsError::into_service_error (#420) 2021-11-22 13:33:20 +00:00
Rob Ede
3597af5c45 prepare actix-rt release 2.5.0 2021-11-22 01:15:18 +00:00
Rob Ede
8891c2681e address unused warning 2021-11-21 23:42:51 +00:00
Rob Ede
233c61ba08 remove dead code 2021-11-21 23:29:25 +00:00
Rob Ede
161f239f12 server: panic earlier if neither runtime detected 2021-11-21 23:29:06 +00:00
fakeshadow
7e7df2f931 add timeout for accepting tls connections (#393)
Co-authored-by: Rob Ede <robjtede@icloud.com>
2021-11-16 00:22:24 +00:00
Luca Bruno
ce8ec15eaa system: run and return exit code on stop (#411)
Co-authored-by: Rob Ede <robjtede@icloud.com>
2021-11-15 18:49:02 +00:00
Rob Ede
ae28ce5377 update mio to 0.8 2021-11-15 18:48:37 +00:00
Rob Ede
54d1d9e520 prepare actix-tls release 3.0.0-beta.8 2021-11-15 17:55:23 +00:00
Alexander Polakov
0b0cbd5388 actix-tls: allow getting uri from Connect (#415)
Co-authored-by: Rob Ede <robjtede@icloud.com>
2021-11-15 10:39:42 +00:00
Rob Ede
443a328fb4 prepare actix-server release 2.0.0-beta.9 2021-11-15 02:39:55 +00:00
Rob Ede
58a67ade32 improve docs of system_exit 2021-11-15 02:33:13 +00:00
Rob Ede
38caa8f088 Fix server arbiter support (#417) 2021-11-14 19:45:15 +00:00
Rob Ede
ed987eef06 prepare actix-server release 2.0.0-beta.8 2021-11-07 15:46:59 +00:00
fakeshadow
3658929010 fix io-uring feature for actix-server (#414)
Co-authored-by: Rob Ede <robjtede@icloud.com>
2021-11-07 15:43:59 +00:00
fakeshadow
3f49d8ab54 remove usage of mio::net::TcpSocket (#413) 2021-11-07 14:18:23 +00:00
fakeshadow
161d1ee94b fix accept timeout and worker graceful shutdown (#412) 2021-11-07 13:00:19 +00:00
Rob Ede
81ba7cafaa fix server non-unix signal impl send (#410) 2021-11-05 02:16:13 +00:00
Rob Ede
f8f51a2240 prepare actix-server release 2.0.0-beta.7 2021-11-05 01:14:28 +00:00
Rob Ede
a2e765ea6e prepare actix-codec release 0.4.1 2021-11-05 01:05:51 +00:00
Rob Ede
03dae6a4a4 prepare actix-rt release 2.4.0 2021-11-05 00:51:34 +00:00
37 changed files with 930 additions and 489 deletions

View File

@@ -14,7 +14,7 @@ ci-check = "hack --workspace --feature-powerset --exclude-features=io-uring chec
ci-check-linux = "hack --workspace --feature-powerset check --tests --examples" ci-check-linux = "hack --workspace --feature-powerset check --tests --examples"
# tests avoiding io-uring feature # tests avoiding io-uring feature
ci-test = "hack test --workspace --exclude=actix-rt --exclude=actix-server --all-features --lib --tests --no-fail-fast -- --nocapture" ci-test = " hack --feature-powerset --exclude=actix-rt --exclude=actix-server --exclude-features=io-uring test --workspace --lib --tests --no-fail-fast -- --nocapture"
ci-test-rt = " hack --feature-powerset --exclude-features=io-uring test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture" ci-test-rt = " hack --feature-powerset --exclude-features=io-uring test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture"
ci-test-server = "hack --feature-powerset --exclude-features=io-uring test --package=actix-server --lib --tests --no-fail-fast -- --nocapture" ci-test-server = "hack --feature-powerset --exclude-features=io-uring test --package=actix-server --lib --tests --no-fail-fast -- --nocapture"

View File

@@ -196,13 +196,6 @@ jobs:
- name: Cache Dependencies - name: Cache Dependencies
uses: Swatinem/rust-cache@v1.3.0 uses: Swatinem/rust-cache@v1.3.0
- name: Install cargo-hack - name: doc tests io-uring
uses: actions-rs/cargo@v1 run: |
with: sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=nightly cargo ci-doctest"
command: install
args: cargo-hack
- name: doc tests
uses: actions-rs/cargo@v1
timeout-minutes: 40
with: { command: ci-doctest }

View File

@@ -1,6 +1,9 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
## 0.4.1 - 2021-11-05
* Added `LinesCodec.` [#338] * Added `LinesCodec.` [#338]
* `Framed::poll_ready` flushes when the buffer is full. [#409] * `Framed::poll_ready` flushes when the buffer is full. [#409]

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "actix-codec" name = "actix-codec"
version = "0.4.0" version = "0.4.1"
authors = [ authors = [
"Nikolay Kim <fafhrd91@gmail.com>", "Nikolay Kim <fafhrd91@gmail.com>",
"Rob Ede <robjtede@icloud.com>", "Rob Ede <robjtede@icloud.com>",

View File

@@ -1,6 +1,15 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
## 2.5.0 - 2021-11-22
* Add `System::run_with_code` to allow retrieving the exit code on stop. [#411]
[#411]: https://github.com/actix/actix-net/pull/411
## 2.4.0 - 2021-11-05
* Add `Arbiter::try_current` for situations where thread may or may not have Arbiter context. [#408] * Add `Arbiter::try_current` for situations where thread may or may not have Arbiter context. [#408]
* Start io-uring with `System::new` when feature is enabled. [#395] * Start io-uring with `System::new` when feature is enabled. [#395]
@@ -105,7 +114,7 @@
[#129]: https://github.com/actix/actix-net/issues/129 [#129]: https://github.com/actix/actix-net/issues/129
## 1.1.0 - 2020-04-08 (YANKED) ## 1.1.0 - 2020-04-08 _(YANKED)_
* Expose `System::is_set` to check if current system has ben started [#99] * Expose `System::is_set` to check if current system has ben started [#99]
* Add `Arbiter::is_running` to check if event loop is running [#124] * Add `Arbiter::is_running` to check if event loop is running [#124]
* Add `Arbiter::local_join` associated function * Add `Arbiter::local_join` associated function

View File

@@ -1,9 +1,10 @@
[package] [package]
name = "actix-rt" name = "actix-rt"
version = "2.3.0" version = "2.5.0"
authors = [ authors = [
"Nikolay Kim <fafhrd91@gmail.com>", "Nikolay Kim <fafhrd91@gmail.com>",
"Rob Ede <robjtede@icloud.com>", "Rob Ede <robjtede@icloud.com>",
"fakeshadow <24548779@qq.com>",
] ]
description = "Tokio-based single-threaded async runtime for the Actix ecosystem" description = "Tokio-based single-threaded async runtime for the Actix ecosystem"
keywords = ["async", "futures", "io", "runtime"] keywords = ["async", "futures", "io", "runtime"]

View File

@@ -3,11 +3,11 @@
> Tokio-based single-threaded async runtime for the Actix ecosystem. > Tokio-based single-threaded async runtime for the Actix ecosystem.
[![crates.io](https://img.shields.io/crates/v/actix-rt?label=latest)](https://crates.io/crates/actix-rt) [![crates.io](https://img.shields.io/crates/v/actix-rt?label=latest)](https://crates.io/crates/actix-rt)
[![Documentation](https://docs.rs/actix-rt/badge.svg?version=2.3.0)](https://docs.rs/actix-rt/2.3.0) [![Documentation](https://docs.rs/actix-rt/badge.svg?version=2.5.0)](https://docs.rs/actix-rt/2.5.0)
[![Version](https://img.shields.io/badge/rustc-1.46+-ab6000.svg)](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html) [![Version](https://img.shields.io/badge/rustc-1.46+-ab6000.svg)](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html)
![MIT or Apache 2.0 licensed](https://img.shields.io/crates/l/actix-rt.svg) ![MIT or Apache 2.0 licensed](https://img.shields.io/crates/l/actix-rt.svg)
<br /> <br />
[![dependency status](https://deps.rs/crate/actix-rt/2.3.0/status.svg)](https://deps.rs/crate/actix-rt/2.3.0) [![dependency status](https://deps.rs/crate/actix-rt/2.5.0/status.svg)](https://deps.rs/crate/actix-rt/2.5.0)
![Download](https://img.shields.io/crates/d/actix-rt.svg) ![Download](https://img.shields.io/crates/d/actix-rt.svg)
[![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/WghFtEH6Hb) [![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/WghFtEH6Hb)

View File

@@ -36,6 +36,9 @@
//! # `io-uring` Support //! # `io-uring` Support
//! There is experimental support for using io-uring with this crate by enabling the //! There is experimental support for using io-uring with this crate by enabling the
//! `io-uring` feature. For now, it is semver exempt. //! `io-uring` feature. For now, it is semver exempt.
//!
//! Note that there are currently some unimplemented parts of using `actix-rt` with `io-uring`.
//! In particular, when running a `System`, only `System::block_on` is supported.
#![deny(rust_2018_idioms, nonstandard_style)] #![deny(rust_2018_idioms, nonstandard_style)]
#![allow(clippy::type_complexity)] #![allow(clippy::type_complexity)]

View File

@@ -67,11 +67,7 @@ impl System {
let sys_ctrl = SystemController::new(sys_rx, stop_tx); let sys_ctrl = SystemController::new(sys_rx, stop_tx);
rt.spawn(sys_ctrl); rt.spawn(sys_ctrl);
SystemRunner { SystemRunner { rt, stop_rx }
rt,
stop_rx,
system,
}
} }
} }
@@ -94,7 +90,7 @@ impl System {
where where
F: Fn() -> tokio::runtime::Runtime, F: Fn() -> tokio::runtime::Runtime,
{ {
unimplemented!("System::with_tokio_rt is not implemented yet") unimplemented!("System::with_tokio_rt is not implemented for io-uring feature yet")
} }
} }
@@ -175,38 +171,37 @@ impl System {
} }
} }
#[cfg(not(feature = "io-uring"))]
/// Runner that keeps a [System]'s event loop alive until stop message is received. /// Runner that keeps a [System]'s event loop alive until stop message is received.
#[cfg(not(feature = "io-uring"))]
#[must_use = "A SystemRunner does nothing unless `run` is called."] #[must_use = "A SystemRunner does nothing unless `run` is called."]
#[derive(Debug)] #[derive(Debug)]
pub struct SystemRunner { pub struct SystemRunner {
rt: crate::runtime::Runtime, rt: crate::runtime::Runtime,
stop_rx: oneshot::Receiver<i32>, stop_rx: oneshot::Receiver<i32>,
#[allow(dead_code)]
system: System,
} }
#[cfg(not(feature = "io-uring"))] #[cfg(not(feature = "io-uring"))]
impl SystemRunner { impl SystemRunner {
/// Starts event loop and will return once [System] is [stopped](System::stop). /// Starts event loop and will return once [System] is [stopped](System::stop).
pub fn run(self) -> io::Result<()> { pub fn run(self) -> io::Result<()> {
let exit_code = self.run_with_code()?;
match exit_code {
0 => Ok(()),
nonzero => Err(io::Error::new(
io::ErrorKind::Other,
format!("Non-zero exit code: {}", nonzero),
)),
}
}
/// Runs the event loop until [stopped](System::stop_with_code), returning the exit code.
pub fn run_with_code(self) -> io::Result<i32> {
let SystemRunner { rt, stop_rx, .. } = self; let SystemRunner { rt, stop_rx, .. } = self;
// run loop // run loop
match rt.block_on(stop_rx) { rt.block_on(stop_rx)
Ok(code) => { .map_err(|err| io::Error::new(io::ErrorKind::Other, err))
if code != 0 {
Err(io::Error::new(
io::ErrorKind::Other,
format!("Non-zero exit code: {}", code),
))
} else {
Ok(())
}
}
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
}
} }
/// Runs the provided future, blocking the current thread until the future completes. /// Runs the provided future, blocking the current thread until the future completes.
@@ -216,8 +211,8 @@ impl SystemRunner {
} }
} }
#[cfg(feature = "io-uring")]
/// Runner that keeps a [System]'s event loop alive until stop message is received. /// Runner that keeps a [System]'s event loop alive until stop message is received.
#[cfg(feature = "io-uring")]
#[must_use = "A SystemRunner does nothing unless `run` is called."] #[must_use = "A SystemRunner does nothing unless `run` is called."]
#[derive(Debug)] #[derive(Debug)]
pub struct SystemRunner; pub struct SystemRunner;
@@ -226,7 +221,14 @@ pub struct SystemRunner;
impl SystemRunner { impl SystemRunner {
/// Starts event loop and will return once [System] is [stopped](System::stop). /// Starts event loop and will return once [System] is [stopped](System::stop).
pub fn run(self) -> io::Result<()> { pub fn run(self) -> io::Result<()> {
unimplemented!("SystemRunner::run is not implemented yet") unimplemented!("SystemRunner::run is not implemented for io-uring feature yet");
}
/// Runs the event loop until [stopped](System::stop_with_code), returning the exit code.
pub fn run_with_code(self) -> io::Result<i32> {
unimplemented!(
"SystemRunner::run_with_code is not implemented for io-uring feature yet"
);
} }
/// Runs the provided future, blocking the current thread until the future completes. /// Runs the provided future, blocking the current thread until the future completes.

View File

@@ -24,6 +24,15 @@ fn await_for_timer() {
); );
} }
#[cfg(not(feature = "io-uring"))]
#[test]
fn run_with_code() {
let sys = System::new();
System::current().stop_with_code(42);
let exit_code = sys.run_with_code().expect("system stop should not error");
assert_eq!(exit_code, 42);
}
#[test] #[test]
fn join_another_arbiter() { fn join_another_arbiter() {
let time = Duration::from_secs(1); let time = Duration::from_secs(1);
@@ -99,8 +108,8 @@ fn wait_for_spawns() {
let handle = rt.spawn(async { let handle = rt.spawn(async {
println!("running on the runtime"); println!("running on the runtime");
// assertion panic is caught at task boundary // panic is caught at task boundary
assert_eq!(1, 2); panic!("intentional test panic");
}); });
assert!(rt.block_on(handle).is_err()); assert!(rt.block_on(handle).is_err());

View File

@@ -1,6 +1,21 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
## 2.0.0-beta.9 - 2021-11-15
* Restore `Arbiter` support lost in `beta.8`. [#417]
[#417]: https://github.com/actix/actix-net/pull/417
## 2.0.0-beta.8 - 2021-11-05 _(YANKED)_
* Fix non-unix signal handler. [#410]
[#410]: https://github.com/actix/actix-net/pull/410
## 2.0.0-beta.7 - 2021-11-05 _(YANKED)_
* Server can be started in regular Tokio runtime. [#408] * Server can be started in regular Tokio runtime. [#408]
* Expose new `Server` type whose `Future` impl resolves when server stops. [#408] * Expose new `Server` type whose `Future` impl resolves when server stops. [#408]
* Rename `Server` to `ServerHandle`. [#407] * Rename `Server` to `ServerHandle`. [#407]

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "actix-server" name = "actix-server"
version = "2.0.0-beta.6" version = "2.0.0-beta.9"
authors = [ authors = [
"Nikolay Kim <fafhrd91@gmail.com>", "Nikolay Kim <fafhrd91@gmail.com>",
"fakeshadow <24548779@qq.com>", "fakeshadow <24548779@qq.com>",
@@ -18,22 +18,27 @@ path = "src/lib.rs"
[features] [features]
default = [] default = []
io-uring = ["actix-rt/io-uring"] io-uring = ["tokio-uring", "actix-rt/io-uring"]
[dependencies] [dependencies]
actix-rt = { version = "2.0.0", default-features = false } actix-rt = { version = "2.4.0", default-features = false }
actix-service = "2.0.0" actix-service = "2.0.0"
actix-utils = "3.0.0" actix-utils = "3.0.0"
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] }
log = "0.4" log = "0.4"
mio = { version = "0.7.6", features = ["os-poll", "net"] } mio = { version = "0.8", features = ["os-poll", "net"] }
num_cpus = "1.13" num_cpus = "1.13"
socket2 = "0.4.2"
tokio = { version = "1.5.1", features = ["sync"] } tokio = { version = "1.5.1", features = ["sync"] }
# runtime for io-uring feature
tokio-uring = { version = "0.1", optional = true }
[dev-dependencies] [dev-dependencies]
actix-codec = "0.4.0" actix-codec = "0.4.0"
actix-rt = "2.0.0" actix-rt = "2.4.0"
bytes = "1" bytes = "1"
env_logger = "0.9" env_logger = "0.9"

View File

@@ -127,7 +127,7 @@ impl Accept {
let mut events = mio::Events::with_capacity(256); let mut events = mio::Events::with_capacity(256);
loop { loop {
if let Err(e) = self.poll.poll(&mut events, None) { if let Err(e) = self.poll.poll(&mut events, self.timeout) {
match e.kind() { match e.kind() {
io::ErrorKind::Interrupted => {} io::ErrorKind::Interrupted => {}
_ => panic!("Poll error: {}", e), _ => panic!("Poll error: {}", e),

View File

@@ -8,7 +8,7 @@ use crate::{
server::ServerCommand, server::ServerCommand,
service::{InternalServiceFactory, ServiceFactory, StreamNewService}, service::{InternalServiceFactory, ServiceFactory, StreamNewService},
socket::{ socket::{
MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs, create_mio_tcp_listener, MioListener, MioTcpListener, StdTcpListener, ToSocketAddrs,
}, },
worker::ServerWorkerConfig, worker::ServerWorkerConfig,
Server, Server,
@@ -112,7 +112,7 @@ impl ServerBuilder {
self.max_concurrent_connections(num) self.max_concurrent_connections(num)
} }
/// Stop Actix system. /// Stop Actix `System` after server shutdown.
pub fn system_exit(mut self) -> Self { pub fn system_exit(mut self) -> Self {
self.exit = true; self.exit = true;
self self
@@ -242,7 +242,8 @@ impl ServerBuilder {
use std::net::{IpAddr, Ipv4Addr}; use std::net::{IpAddr, Ipv4Addr};
lst.set_nonblocking(true)?; lst.set_nonblocking(true)?;
let token = self.next_token(); let token = self.next_token();
let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); let addr =
crate::socket::StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
self.factories.push(StreamNewService::create( self.factories.push(StreamNewService::create(
name.as_ref().to_string(), name.as_ref().to_string(),
token, token,
@@ -263,7 +264,7 @@ pub(super) fn bind_addr<S: ToSocketAddrs>(
let mut success = false; let mut success = false;
let mut sockets = Vec::new(); let mut sockets = Vec::new();
for addr in addr.to_socket_addrs()? { for addr in addr.to_socket_addrs()? {
match create_tcp_listener(addr, backlog) { match create_mio_tcp_listener(addr, backlog) {
Ok(lst) => { Ok(lst) => {
success = true; success = true;
sockets.push(lst); sockets.push(lst);
@@ -283,14 +284,3 @@ pub(super) fn bind_addr<S: ToSocketAddrs>(
)) ))
} }
} }
fn create_tcp_listener(addr: StdSocketAddr, backlog: u32) -> io::Result<MioTcpListener> {
let socket = match addr {
StdSocketAddr::V4(_) => MioTcpSocket::new_v4()?,
StdSocketAddr::V6(_) => MioTcpSocket::new_v6()?,
};
socket.set_reuseaddr(true)?;
socket.bind(addr)?;
socket.listen(backlog)
}

View File

@@ -42,10 +42,12 @@ impl ServerHandle {
/// Stop incoming connection processing, stop all workers and exit. /// Stop incoming connection processing, stop all workers and exit.
pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> { pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let _ = self.cmd_tx.send(ServerCommand::Stop { let _ = self.cmd_tx.send(ServerCommand::Stop {
graceful, graceful,
completion: Some(tx), completion: Some(tx),
}); });
async { async {
let _ = rx.await; let _ = rx.await;
} }

View File

@@ -4,7 +4,7 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
use futures_core::future::{BoxFuture, LocalBoxFuture}; use futures_core::future::BoxFuture;
// a poor man's join future. joined future is only used when starting/stopping the server. // a poor man's join future. joined future is only used when starting/stopping the server.
// pin_project and pinned futures are overkill for this task. // pin_project and pinned futures are overkill for this task.
@@ -61,63 +61,6 @@ impl<T> Future for JoinAll<T> {
} }
} }
pub(crate) fn join_all_local<T>(
fut: Vec<impl Future<Output = T> + 'static>,
) -> JoinAllLocal<T> {
let fut = fut
.into_iter()
.map(|f| JoinLocalFuture::LocalFuture(Box::pin(f)))
.collect();
JoinAllLocal { fut }
}
// a poor man's join future. joined future is only used when starting/stopping the server.
// pin_project and pinned futures are overkill for this task.
pub(crate) struct JoinAllLocal<T> {
fut: Vec<JoinLocalFuture<T>>,
}
enum JoinLocalFuture<T> {
LocalFuture(LocalBoxFuture<'static, T>),
Result(Option<T>),
}
impl<T> Unpin for JoinAllLocal<T> {}
impl<T> Future for JoinAllLocal<T> {
type Output = Vec<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut ready = true;
let this = self.get_mut();
for fut in this.fut.iter_mut() {
if let JoinLocalFuture::LocalFuture(f) = fut {
match f.as_mut().poll(cx) {
Poll::Ready(t) => {
*fut = JoinLocalFuture::Result(Some(t));
}
Poll::Pending => ready = false,
}
}
}
if ready {
let mut res = Vec::new();
for fut in this.fut.iter_mut() {
if let JoinLocalFuture::Result(f) = fut {
res.push(f.take().unwrap());
}
}
Poll::Ready(res)
} else {
Poll::Pending
}
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
@@ -132,13 +75,4 @@ mod test {
assert_eq!(Err(3), res.next().unwrap()); assert_eq!(Err(3), res.next().unwrap());
assert_eq!(Ok(9), res.next().unwrap()); assert_eq!(Ok(9), res.next().unwrap());
} }
#[actix_rt::test]
async fn test_join_all_local() {
let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))];
let mut res = join_all_local(futs).await.into_iter();
assert_eq!(Ok(1), res.next().unwrap());
assert_eq!(Err(3), res.next().unwrap());
assert_eq!(Ok(9), res.next().unwrap());
}
} }

View File

@@ -132,15 +132,13 @@ impl Server {
.collect(); .collect();
// Give log information on what runtime will be used. // Give log information on what runtime will be used.
let is_tokio = tokio::runtime::Handle::try_current().is_ok();
let is_actix = actix_rt::System::try_current().is_some(); let is_actix = actix_rt::System::try_current().is_some();
let is_tokio = tokio::runtime::Handle::try_current().is_ok();
match (is_tokio, is_actix) { match (is_actix, is_tokio) {
(true, false) => info!("Tokio runtime found. Starting in existing Tokio runtime"), (true, _) => info!("Actix runtime found; starting in Actix runtime"),
(_, true) => info!("Actix runtime found. Starting in Actix runtime"), (_, true) => info!("Tokio runtime found; starting in existing Tokio runtime"),
(_, _) => info!( (_, false) => panic!("Actix or Tokio runtime not found; halting"),
"Actix/Tokio runtime not found. Starting in newt Tokio current-thread runtime"
),
} }
for (_, name, lst) in &builder.sockets { for (_, name, lst) in &builder.sockets {
@@ -196,11 +194,11 @@ impl Future for Server {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().get_mut() { match self.as_mut().get_mut() {
Server::Error(err) => Poll::Ready(Err(err Self::Error(err) => Poll::Ready(Err(err
.take() .take()
.expect("Server future cannot be polled after error"))), .expect("Server future cannot be polled after error"))),
Server::Server(inner) => { Self::Server(inner) => {
// poll Signals // poll Signals
if let Some(ref mut signals) = inner.signals { if let Some(ref mut signals) = inner.signals {
if let Poll::Ready(signal) = Pin::new(signals).poll(cx) { if let Poll::Ready(signal) = Pin::new(signals).poll(cx) {

View File

@@ -35,7 +35,7 @@ impl fmt::Display for Signal {
/// Process signal listener. /// Process signal listener.
pub(crate) struct Signals { pub(crate) struct Signals {
#[cfg(not(unix))] #[cfg(not(unix))]
signals: futures_core::future::LocalBoxFuture<'static, std::io::Result<()>>, signals: futures_core::future::BoxFuture<'static, std::io::Result<()>>,
#[cfg(unix)] #[cfg(unix)]
signals: Vec<(Signal, actix_rt::signal::unix::Signal)>, signals: Vec<(Signal, actix_rt::signal::unix::Signal)>,

View File

@@ -2,7 +2,7 @@ pub(crate) use std::net::{
SocketAddr as StdSocketAddr, TcpListener as StdTcpListener, ToSocketAddrs, SocketAddr as StdSocketAddr, TcpListener as StdTcpListener, ToSocketAddrs,
}; };
pub(crate) use mio::net::{TcpListener as MioTcpListener, TcpSocket as MioTcpSocket}; pub(crate) use mio::net::TcpListener as MioTcpListener;
#[cfg(unix)] #[cfg(unix)]
pub(crate) use { pub(crate) use {
mio::net::UnixListener as MioUnixListener, mio::net::UnixListener as MioUnixListener,
@@ -223,6 +223,22 @@ mod unix_impl {
} }
} }
pub(crate) fn create_mio_tcp_listener(
addr: StdSocketAddr,
backlog: u32,
) -> io::Result<MioTcpListener> {
use socket2::{Domain, Protocol, Socket, Type};
let socket = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))?;
socket.set_reuse_address(true)?;
socket.set_nonblocking(true)?;
socket.bind(&addr.into())?;
socket.listen(backlog as i32)?;
Ok(MioTcpListener::from_std(StdTcpListener::from(socket)))
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@@ -234,11 +250,8 @@ mod tests {
assert_eq!(format!("{}", addr), "127.0.0.1:8080"); assert_eq!(format!("{}", addr), "127.0.0.1:8080");
let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap(); let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap();
let socket = MioTcpSocket::new_v4().unwrap(); let lst = create_mio_tcp_listener(addr, 128).unwrap();
socket.set_reuseaddr(true).unwrap(); let lst = MioListener::Tcp(lst);
socket.bind(addr).unwrap();
let tcp = socket.listen(128).unwrap();
let lst = MioListener::Tcp(tcp);
assert!(format!("{:?}", lst).contains("TcpListener")); assert!(format!("{:?}", lst).contains("TcpListener"));
assert!(format!("{}", lst).contains("127.0.0.1")); assert!(format!("{}", lst).contains("127.0.0.1"));
} }

View File

@@ -1,5 +1,4 @@
use std::sync::mpsc; use std::{io, net, sync::mpsc, thread};
use std::{io, net, thread};
use actix_rt::{net::TcpStream, System}; use actix_rt::{net::TcpStream, System};
@@ -105,12 +104,16 @@ impl TestServer {
/// Get first available unused local address. /// Get first available unused local address.
pub fn unused_addr() -> net::SocketAddr { pub fn unused_addr() -> net::SocketAddr {
use socket2::{Domain, Protocol, Socket, Type};
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap(); let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
let socket = mio::net::TcpSocket::new_v4().unwrap(); let socket =
socket.bind(addr).unwrap(); Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP)).unwrap();
socket.set_reuseaddr(true).unwrap(); socket.set_reuse_address(true).unwrap();
let tcp = socket.listen(1024).unwrap(); socket.set_nonblocking(true).unwrap();
tcp.local_addr().unwrap() socket.bind(&addr.into()).unwrap();
socket.listen(1024).unwrap();
net::TcpListener::from(socket).local_addr().unwrap()
} }
} }
@@ -147,3 +150,16 @@ impl Drop for TestServerRuntime {
self.stop() self.stop()
} }
} }
#[cfg(test)]
mod tests {
use actix_service::fn_service;
use super::*;
#[tokio::test]
async fn plain_tokio_runtime() {
let srv = TestServer::with(|| fn_service(|_sock| async move { Ok::<_, ()>(()) }));
assert!(srv.connect().is_ok());
}
}

View File

@@ -24,7 +24,6 @@ use tokio::sync::{
}; };
use crate::{ use crate::{
join_all::join_all_local,
service::{BoxedServerService, InternalServiceFactory}, service::{BoxedServerService, InternalServiceFactory},
socket::MioStream, socket::MioStream,
waker_queue::{WakerInterest, WakerQueue}, waker_queue::{WakerInterest, WakerQueue},
@@ -202,8 +201,8 @@ impl WorkerHandleServer {
pub(crate) struct ServerWorker { pub(crate) struct ServerWorker {
// UnboundedReceiver<Conn> should always be the first field. // UnboundedReceiver<Conn> should always be the first field.
// It must be dropped as soon as ServerWorker dropping. // It must be dropped as soon as ServerWorker dropping.
rx: UnboundedReceiver<Conn>, conn_rx: UnboundedReceiver<Conn>,
rx2: UnboundedReceiver<Stop>, stop_rx: UnboundedReceiver<Stop>,
counter: WorkerCounter, counter: WorkerCounter,
services: Box<[WorkerService]>, services: Box<[WorkerService]>,
factories: Box<[Box<dyn InternalServiceFactory>]>, factories: Box<[Box<dyn InternalServiceFactory>]>,
@@ -212,7 +211,7 @@ pub(crate) struct ServerWorker {
} }
struct WorkerService { struct WorkerService {
factory: usize, factory_idx: usize,
status: WorkerServiceStatus, status: WorkerServiceStatus,
service: BoxedServerService, service: BoxedServerService,
} }
@@ -234,6 +233,12 @@ enum WorkerServiceStatus {
Stopped, Stopped,
} }
impl Default for WorkerServiceStatus {
fn default() -> Self {
Self::Unavailable
}
}
/// Config for worker behavior passed down from server builder. /// Config for worker behavior passed down from server builder.
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
pub(crate) struct ServerWorkerConfig { pub(crate) struct ServerWorkerConfig {
@@ -277,14 +282,131 @@ impl ServerWorker {
) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> { ) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> {
trace!("starting server worker {}", idx); trace!("starting server worker {}", idx);
let (tx1, rx) = unbounded_channel(); let (tx1, conn_rx) = unbounded_channel();
let (tx2, rx2) = unbounded_channel(); let (tx2, stop_rx) = unbounded_channel();
let counter = Counter::new(config.max_concurrent_connections); let counter = Counter::new(config.max_concurrent_connections);
let pair = handle_pair(idx, tx1, tx2, counter.clone());
let counter_clone = counter.clone(); // get actix system context if it is set
// every worker runs in it's own arbiter. let actix_system = System::try_current();
// get tokio runtime handle if it is set
let tokio_handle = tokio::runtime::Handle::try_current().ok();
// service factories initialization channel
let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel::<io::Result<()>>(1);
// outline of following code:
//
// if system exists
// if uring enabled
// start arbiter using uring method
// else
// start arbiter with regular tokio
// else
// if uring enabled
// start uring in spawned thread
// else
// start regular tokio in spawned thread
// every worker runs in it's own thread and tokio runtime.
// use a custom tokio runtime builder to change the settings of runtime. // use a custom tokio runtime builder to change the settings of runtime.
match (actix_system, tokio_handle) {
(None, None) => {
panic!("No runtime detected. Start a Tokio (or Actix) runtime.");
}
// no actix system
(None, Some(rt_handle)) => {
std::thread::Builder::new()
.name(format!("actix-server worker {}", idx))
.spawn(move || {
let (worker_stopped_tx, worker_stopped_rx) = oneshot::channel();
// local set for running service init futures and worker services
let ls = tokio::task::LocalSet::new();
// init services using existing Tokio runtime (so probably on main thread)
let services = rt_handle.block_on(ls.run_until(async {
let mut services = Vec::new();
for (idx, factory) in factories.iter().enumerate() {
match factory.create().await {
Ok((token, svc)) => services.push((idx, token, svc)),
Err(err) => {
error!("Can not start worker: {:?}", err);
return Err(io::Error::new(
io::ErrorKind::Other,
format!("can not start server service {}", idx),
));
}
}
}
Ok(services)
}));
let services = match services {
Ok(services) => {
factory_tx.send(Ok(())).unwrap();
services
}
Err(err) => {
factory_tx.send(Err(err)).unwrap();
return;
}
};
let worker_services = wrap_worker_services(services);
let worker_fut = async move {
// spawn to make sure ServerWorker runs as non boxed future.
spawn(async move {
ServerWorker {
conn_rx,
stop_rx,
services: worker_services.into_boxed_slice(),
counter: WorkerCounter::new(idx, waker_queue, counter),
factories: factories.into_boxed_slice(),
state: WorkerState::default(),
shutdown_timeout: config.shutdown_timeout,
}
.await;
// wake up outermost task waiting for shutdown
worker_stopped_tx.send(()).unwrap();
});
worker_stopped_rx.await.unwrap();
};
#[cfg(all(target_os = "linux", feature = "io-uring"))]
{
// TODO: pass max blocking thread config when tokio-uring enable configuration
// on building runtime.
let _ = config.max_blocking_threads;
tokio_uring::start(worker_fut);
}
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
{
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(config.max_blocking_threads)
.build()
.unwrap();
rt.block_on(ls.run_until(worker_fut));
}
})
.expect("cannot spawn server worker thread");
}
// with actix system
(Some(_sys), _) => {
#[cfg(all(target_os = "linux", feature = "io-uring"))] #[cfg(all(target_os = "linux", feature = "io-uring"))]
let arbiter = { let arbiter = {
// TODO: pass max blocking thread config when tokio-uring enable configuration // TODO: pass max blocking thread config when tokio-uring enable configuration
@@ -293,89 +415,63 @@ impl ServerWorker {
Arbiter::new() Arbiter::new()
}; };
// get actix system context if it is set #[cfg(not(all(target_os = "linux", feature = "io-uring")))]
let sys = System::try_current(); let arbiter = {
Arbiter::with_tokio_rt(move || {
// service factories initialization channel tokio::runtime::Builder::new_current_thread()
let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel(1);
std::thread::Builder::new()
.name(format!("actix-server worker {}", idx))
.spawn(move || {
// forward existing actix system context
if let Some(sys) = sys {
System::set_current(sys);
}
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.max_blocking_threads(config.max_blocking_threads) .max_blocking_threads(config.max_blocking_threads)
.build() .build()
.unwrap(); .unwrap()
rt.block_on(tokio::task::LocalSet::new().run_until(async move {
let fut = factories
.iter()
.enumerate()
.map(|(idx, factory)| {
let fut = factory.create();
async move { fut.await.map(|(t, s)| (idx, t, s)) }
}) })
.collect::<Vec<_>>();
// a second spawn to run !Send future tasks.
spawn(async move {
let res = join_all_local(fut)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>();
let services = match res {
Ok(res) => res
.into_iter()
.fold(Vec::new(), |mut services, (factory, token, service)| {
assert_eq!(token, services.len());
services.push(WorkerService {
factory,
service,
status: WorkerServiceStatus::Unavailable,
});
services
})
.into_boxed_slice(),
Err(e) => {
error!("Can not start worker: {:?}", e);
Arbiter::try_current().as_ref().map(ArbiterHandle::stop);
return;
}
}; };
factory_tx.send(()).unwrap(); arbiter.spawn(async move {
// spawn_local to run !Send future tasks.
spawn(async move {
let mut services = Vec::new();
// a third spawn to make sure ServerWorker runs as non boxed future. for (idx, factory) in factories.iter().enumerate() {
match factory.create().await {
Ok((token, svc)) => services.push((idx, token, svc)),
Err(err) => {
error!("Can not start worker: {:?}", err);
Arbiter::current().stop();
factory_tx
.send(Err(io::Error::new(
io::ErrorKind::Other,
format!("can not start server service {}", idx),
)))
.unwrap();
return;
}
}
}
factory_tx.send(Ok(())).unwrap();
let worker_services = wrap_worker_services(services);
// spawn to make sure ServerWorker runs as non boxed future.
spawn(ServerWorker { spawn(ServerWorker {
rx, conn_rx,
rx2, stop_rx,
services, services: worker_services.into_boxed_slice(),
counter: WorkerCounter::new(idx, waker_queue, counter_clone), counter: WorkerCounter::new(idx, waker_queue, counter),
factories: factories.into_boxed_slice(), factories: factories.into_boxed_slice(),
state: Default::default(), state: Default::default(),
shutdown_timeout: config.shutdown_timeout, shutdown_timeout: config.shutdown_timeout,
}) });
.await });
.expect("task 3 panic"); });
}) }
.await };
.expect("task 2 panic");
}))
})
.expect("worker thread error/panic");
// wait for service factories initialization // wait for service factories initialization
factory_rx.recv().unwrap(); factory_rx.recv().unwrap()?;
Ok(handle_pair(idx, tx1, tx2, counter)) Ok(pair)
} }
fn restart_service(&mut self, idx: usize, factory_id: usize) { fn restart_service(&mut self, idx: usize, factory_id: usize) {
@@ -413,7 +509,7 @@ impl ServerWorker {
if srv.status == WorkerServiceStatus::Unavailable { if srv.status == WorkerServiceStatus::Unavailable {
trace!( trace!(
"Service {:?} is available", "Service {:?} is available",
self.factories[srv.factory].name(idx) self.factories[srv.factory_idx].name(idx)
); );
srv.status = WorkerServiceStatus::Available; srv.status = WorkerServiceStatus::Available;
} }
@@ -424,7 +520,7 @@ impl ServerWorker {
if srv.status == WorkerServiceStatus::Available { if srv.status == WorkerServiceStatus::Available {
trace!( trace!(
"Service {:?} is unavailable", "Service {:?} is unavailable",
self.factories[srv.factory].name(idx) self.factories[srv.factory_idx].name(idx)
); );
srv.status = WorkerServiceStatus::Unavailable; srv.status = WorkerServiceStatus::Unavailable;
} }
@@ -432,10 +528,10 @@ impl ServerWorker {
Poll::Ready(Err(_)) => { Poll::Ready(Err(_)) => {
error!( error!(
"Service {:?} readiness check returned error, restarting", "Service {:?} readiness check returned error, restarting",
self.factories[srv.factory].name(idx) self.factories[srv.factory_idx].name(idx)
); );
srv.status = WorkerServiceStatus::Failed; srv.status = WorkerServiceStatus::Failed;
return Err((idx, srv.factory)); return Err((idx, srv.factory_idx));
} }
} }
} }
@@ -478,7 +574,6 @@ impl Default for WorkerState {
impl Drop for ServerWorker { impl Drop for ServerWorker {
fn drop(&mut self) { fn drop(&mut self) {
trace!("stopping ServerWorker Arbiter");
Arbiter::try_current().as_ref().map(ArbiterHandle::stop); Arbiter::try_current().as_ref().map(ArbiterHandle::stop);
} }
} }
@@ -490,7 +585,8 @@ impl Future for ServerWorker {
let this = self.as_mut().get_mut(); let this = self.as_mut().get_mut();
// `StopWorker` message handler // `StopWorker` message handler
if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx) if let Poll::Ready(Some(Stop { graceful, tx })) =
Pin::new(&mut this.stop_rx).poll_recv(cx)
{ {
let num = this.counter.total(); let num = this.counter.total();
if num == 0 { if num == 0 {
@@ -552,6 +648,14 @@ impl Future for ServerWorker {
self.poll(cx) self.poll(cx)
} }
WorkerState::Shutdown(ref mut shutdown) => { WorkerState::Shutdown(ref mut shutdown) => {
// drop all pending connections in rx channel.
while let Poll::Ready(Some(conn)) = Pin::new(&mut this.conn_rx).poll_recv(cx) {
// WorkerCounterGuard is needed as Accept thread has incremented counter.
// It's guard's job to decrement the counter together with drop of Conn.
let guard = this.counter.guard();
drop((conn, guard));
}
// wait for 1 second // wait for 1 second
ready!(shutdown.timer.as_mut().poll(cx)); ready!(shutdown.timer.as_mut().poll(cx));
@@ -592,7 +696,7 @@ impl Future for ServerWorker {
} }
// handle incoming io stream // handle incoming io stream
match ready!(Pin::new(&mut this.rx).poll_recv(cx)) { match ready!(Pin::new(&mut this.conn_rx).poll_recv(cx)) {
Some(msg) => { Some(msg) => {
let guard = this.counter.guard(); let guard = this.counter.guard();
let _ = this.services[msg.token].service.call((guard, msg.io)); let _ = this.services[msg.token].service.call((guard, msg.io));
@@ -603,3 +707,19 @@ impl Future for ServerWorker {
} }
} }
} }
fn wrap_worker_services(
services: Vec<(usize, usize, BoxedServerService)>,
) -> Vec<WorkerService> {
services
.into_iter()
.fold(Vec::new(), |mut services, (idx, token, service)| {
assert_eq!(token, services.len());
services.push(WorkerService {
factory_idx: idx,
service,
status: WorkerServiceStatus::Unavailable,
});
services
})
}

View File

@@ -1,18 +1,19 @@
use std::sync::atomic::{AtomicUsize, Ordering}; use std::{
use std::sync::{mpsc, Arc}; net,
use std::{net, thread, time::Duration}; sync::{
atomic::{AtomicUsize, Ordering},
mpsc, Arc,
},
thread,
time::Duration,
};
use actix_rt::{net::TcpStream, time::sleep}; use actix_rt::{net::TcpStream, time::sleep};
use actix_server::Server; use actix_server::{Server, TestServer};
use actix_service::fn_service; use actix_service::fn_service;
fn unused_addr() -> net::SocketAddr { fn unused_addr() -> net::SocketAddr {
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap(); TestServer::unused_addr()
let socket = mio::net::TcpSocket::new_v4().unwrap();
socket.bind(addr).unwrap();
socket.set_reuseaddr(true).unwrap();
let tcp = socket.listen(32).unwrap();
tcp.local_addr().unwrap()
} }
#[test] #[test]
@@ -30,28 +31,63 @@ fn test_bind() {
})? })?
.run(); .run();
let _ = tx.send((srv.handle(), actix_rt::System::current())); let _ = tx.send(srv.handle());
srv.await srv.await
}) })
}); });
let (srv, sys) = rx.recv().unwrap(); let srv = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok());
let _ = srv.stop(true);
h.join().unwrap().unwrap();
}
#[test]
fn plain_tokio_runtime() {
let addr = unused_addr();
let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
let srv = Server::build()
.workers(1)
.disable_signals()
.bind("test", addr, move || {
fn_service(|_| async { Ok::<_, ()>(()) })
})?
.run();
tx.send(srv.handle()).unwrap();
srv.await
})
});
let srv = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500)); thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok()); assert!(net::TcpStream::connect(addr).is_ok());
let _ = srv.stop(true); let _ = srv.stop(true);
sys.stop();
h.join().unwrap().unwrap(); h.join().unwrap().unwrap();
} }
#[test] #[test]
fn test_listen() { fn test_listen() {
let addr = unused_addr(); let addr = unused_addr();
let lst = net::TcpListener::bind(addr).unwrap();
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || { let h = thread::spawn(move || {
let lst = net::TcpListener::bind(addr)?;
actix_rt::System::new().block_on(async { actix_rt::System::new().block_on(async {
let srv = Server::build() let srv = Server::build()
.disable_signals() .disable_signals()
@@ -61,19 +97,18 @@ fn test_listen() {
})? })?
.run(); .run();
let _ = tx.send((srv.handle(), actix_rt::System::current())); let _ = tx.send(srv.handle());
srv.await srv.await
}) })
}); });
let (srv, sys) = rx.recv().unwrap(); let srv = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500)); thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok()); assert!(net::TcpStream::connect(addr).is_ok());
let _ = srv.stop(true); let _ = srv.stop(true);
sys.stop();
h.join().unwrap().unwrap(); h.join().unwrap().unwrap();
} }
@@ -280,12 +315,12 @@ async fn test_service_restart() {
.workers(1) .workers(1)
.run(); .run();
let _ = tx.send((srv.handle(), actix_rt::System::current())); let _ = tx.send(srv.handle());
srv.await srv.await
}) })
}); });
let (srv, sys) = rx.recv().unwrap(); let srv = rx.recv().unwrap();
for _ in 0..5 { for _ in 0..5 {
TcpStream::connect(addr1) TcpStream::connect(addr1)
@@ -308,7 +343,6 @@ async fn test_service_restart() {
assert!(num2_clone.load(Ordering::SeqCst) > 5); assert!(num2_clone.load(Ordering::SeqCst) > 5);
let _ = srv.stop(false); let _ = srv.stop(false);
sys.stop();
h.join().unwrap().unwrap(); h.join().unwrap().unwrap();
} }
@@ -385,13 +419,13 @@ async fn worker_restart() {
.workers(2) .workers(2)
.run(); .run();
let _ = tx.send((srv.handle(), actix_rt::System::current())); let _ = tx.send(srv.handle());
srv.await srv.await
}) })
}); });
let (srv, sys) = rx.recv().unwrap(); let srv = rx.recv().unwrap();
sleep(Duration::from_secs(3)).await; sleep(Duration::from_secs(3)).await;
@@ -449,6 +483,31 @@ async fn worker_restart() {
stream.shutdown().await.unwrap(); stream.shutdown().await.unwrap();
let _ = srv.stop(false); let _ = srv.stop(false);
sys.stop();
h.join().unwrap().unwrap(); h.join().unwrap().unwrap();
} }
#[test]
#[should_panic]
fn no_runtime() {
// test set up in a way that would prevent time out if support for runtime-less init was added
let addr = unused_addr();
let srv = Server::build()
.workers(1)
.disable_signals()
.bind("test", addr, move || {
fn_service(|_| async { Ok::<_, ()>(()) })
})
.unwrap()
.run();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let _ = srv.handle().stop(true);
rt.block_on(async { srv.await }).unwrap();
}

View File

@@ -56,221 +56,124 @@
## 1.0.6 - 2020-08-09 ## 1.0.6 - 2020-08-09
### Fixed
* Removed unsound custom Cell implementation that allowed obtaining several mutable references to * Removed unsound custom Cell implementation that allowed obtaining several mutable references to
the same data, which is undefined behavior in Rust and could lead to violations of memory safety. External code could obtain several mutable references to the same data through the same data, which is undefined behavior in Rust and could lead to violations of memory safety. External code could obtain several mutable references to the same data through
service combinators. Attempts to acquire several mutable references to the same data will instead service combinators. Attempts to acquire several mutable references to the same data will instead
result in a panic. result in a panic.
## [1.0.5] - 2020-01-16
### Fixed ## 1.0.5 - 2020-01-16
* Fixed unsoundness in .and_then()/.then() service combinators.
* Fixed unsoundness in .and_then()/.then() service combinators
## [1.0.4] - 2020-01-15
### Fixed
## 1.0.4 - 2020-01-15
* Revert 1.0.3 change * Revert 1.0.3 change
## [1.0.3] - 2020-01-15
### Fixed ## 1.0.3 - 2020-01-15
* Fixed unsoundness in `AndThenService` impl.
* Fixed unsoundness in `AndThenService` impl
## [1.0.2] - 2020-01-08
### Added
* Add `into_service` helper function
## [1.0.1] - 2019-12-22 ## 1.0.2 - 2020-01-08
* Add `into_service` helper function.
### Changed
* `map_config()` and `unit_config()` accepts `IntoServiceFactory` type
## [1.0.0] - 2019-12-11 ## 1.0.1 - 2019-12-22
* `map_config()` and `unit_config()` now accept `IntoServiceFactory` type.
### Added
## 1.0.0 - 2019-12-11
* Add Clone impl for Apply service * Add Clone impl for Apply service
## [1.0.0-alpha.4] - 2019-12-08 ## 1.0.0-alpha.4 - 2019-12-08
### Changed
* Renamed `service_fn` to `fn_service` * Renamed `service_fn` to `fn_service`
* Renamed `factory_fn` to `fn_factory` * Renamed `factory_fn` to `fn_factory`
* Renamed `factory_fn_cfg` to `fn_factory_with_config` * Renamed `factory_fn_cfg` to `fn_factory_with_config`
## [1.0.0-alpha.3] - 2019-12-06 ## 1.0.0-alpha.3 - 2019-12-06
### Changed
* Add missing Clone impls * Add missing Clone impls
* Restore `Transform::map_init_err()` combinator * Restore `Transform::map_init_err()` combinator
* Restore `Service/Factory::apply_fn()` in form of `Pipeline/Factory::and_then_apply_fn()` * Restore `Service/Factory::apply_fn()` in form of `Pipeline/Factory::and_then_apply_fn()`
* Optimize service combinators and futures memory layout * Optimize service combinators and futures memory layout
## [1.0.0-alpha.2] - 2019-12-02 ## 1.0.0-alpha.2 - 2019-12-02
### Changed
* Use owned config value for service factory * Use owned config value for service factory
* Renamed BoxedNewService/BoxedService to BoxServiceFactory/BoxService * Renamed BoxedNewService/BoxedService to BoxServiceFactory/BoxService
## [1.0.0-alpha.1] - 2019-11-25 ## 1.0.0-alpha.1 - 2019-11-25
* Migrated to `std::future`
### Changed
* Migraded to `std::future`
* `NewService` renamed to `ServiceFactory` * `NewService` renamed to `ServiceFactory`
* Added `pipeline` and `pipeline_factory` function * Added `pipeline` and `pipeline_factory` function
## [0.4.2] - 2019-08-27 ## 0.4.2 - 2019-08-27
### Fixed
* Check service readiness for `new_apply_cfg` combinator * Check service readiness for `new_apply_cfg` combinator
## [0.4.1] - 2019-06-06 ## 0.4.1 - 2019-06-06
### Added
* Add `new_apply_cfg` function * Add `new_apply_cfg` function
## [0.4.0] - 2019-05-12
### Changed ## 0.4.0 - 2019-05-12
* Add `NewService::map_config` and `NewService::unit_config` combinators.
* Use associated type for `NewService` config * Use associated type for `NewService` config.
* Change `apply_cfg` function.
* Change `apply_cfg` function * Renamed helper functions.
* Renamed helper functions
### Added
* Add `NewService::map_config` and `NewService::unit_config` combinators
## [0.3.6] - 2019-04-07 ## 0.3.6 - 2019-04-07
### Changed
* Poll boxed service call result immediately * Poll boxed service call result immediately
## [0.3.5] - 2019-03-29 ## 0.3.5 - 2019-03-29
* Add `impl<S: Service> Service for Rc<RefCell<S>>`.
### Added
* Add `impl<S: Service> Service for Rc<RefCell<S>>`
## [0.3.4] - 2019-03-12 ## 0.3.4 - 2019-03-12
### Added
* Add `Transform::from_err()` combinator * Add `Transform::from_err()` combinator
* Add `apply_fn` helper * Add `apply_fn` helper
* Add `apply_fn_factory` helper * Add `apply_fn_factory` helper
* Add `apply_transform` helper * Add `apply_transform` helper
* Add `apply_cfg` helper * Add `apply_cfg` helper
## [0.3.3] - 2019-03-09 ## 0.3.3 - 2019-03-09
### Added
* Add `ApplyTransform` new service for transform and new service. * Add `ApplyTransform` new service for transform and new service.
* Add `NewService::apply_cfg()` combinator, allows to use nested `NewService` with different config parameter.
* Add `NewService::apply_cfg()` combinator, allows to use
nested `NewService` with different config parameter.
### Changed
* Revert IntoFuture change * Revert IntoFuture change
## [0.3.2] - 2019-03-04 ## 0.3.2 - 2019-03-04
### Changed
* Change `NewService::Future` and `Transform::Future` to the `IntoFuture` trait. * Change `NewService::Future` and `Transform::Future` to the `IntoFuture` trait.
* Export `AndThenTransform` type * Export `AndThenTransform` type
## [0.3.1] - 2019-03-04 ## 0.3.1 - 2019-03-04
### Changed
* Simplify Transform trait * Simplify Transform trait
## [0.3.0] - 2019-03-02 ## 0.3.0 - 2019-03-02
## Added
* Added boxed NewService and Service. * Added boxed NewService and Service.
## Changed
* Added `Config` parameter to `NewService` trait. * Added `Config` parameter to `NewService` trait.
* Added `Config` parameter to `NewTransform` trait. * Added `Config` parameter to `NewTransform` trait.
## [0.2.2] - 2019-02-19 ## 0.2.2 - 2019-02-19
### Added
* Added `NewService` impl for `Rc<S> where S: NewService` * Added `NewService` impl for `Rc<S> where S: NewService`
* Added `NewService` impl for `Arc<S> where S: NewService` * Added `NewService` impl for `Arc<S> where S: NewService`
## [0.2.1] - 2019-02-03 ## 0.2.1 - 2019-02-03
### Changed
* Generalize `.apply` combinator with Transform trait * Generalize `.apply` combinator with Transform trait
## [0.2.0] - 2019-02-01 ## 0.2.0 - 2019-02-01
### Changed
* Use associated type instead of generic for Service definition. * Use associated type instead of generic for Service definition.
* Before: * Before:
```rust ```rust
impl Service<Request> for Client { impl Service<Request> for Client {
type Response = Response; type Response = Response;
@@ -278,7 +181,6 @@
} }
``` ```
* After: * After:
```rust ```rust
impl Service for Client { impl Service for Client {
type Request = Request; type Request = Request;
@@ -288,50 +190,30 @@
``` ```
## [0.1.6] - 2019-01-24 ## 0.1.6 - 2019-01-24
### Changed
* Use `FnMut` instead of `Fn` for .apply() and .map() combinators and `FnService` type * Use `FnMut` instead of `Fn` for .apply() and .map() combinators and `FnService` type
* Change `.apply()` error semantic, new service's error is `From<Self::Error>` * Change `.apply()` error semantic, new service's error is `From<Self::Error>`
## [0.1.5] - 2019-01-13 ## 0.1.5 - 2019-01-13
* Make `Out::Error` convertible from `T::Error` for apply combinator
### Changed
* Make `Out::Error` convertable from `T::Error` for apply combinator
## [0.1.4] - 2019-01-11 ## 0.1.4 - 2019-01-11
### Changed
* Use `FnMut` instead of `Fn` for `FnService` * Use `FnMut` instead of `Fn` for `FnService`
## [0.1.3] - 2018-12-12 ## 0.1.3 - 2018-12-12
### Changed
* Split service combinators to separate trait * Split service combinators to separate trait
## [0.1.2] - 2018-12-12 ## 0.1.2 - 2018-12-12
### Fixed
* Release future early for `.and_then()` and `.then()` combinators * Release future early for `.and_then()` and `.then()` combinators
## [0.1.1] - 2018-12-09 ## 0.1.1 - 2018-12-09
* Added Service impl for `Box<S: Service>`
### Added
* Added Service impl for Box<S: Service>
## [0.1.0] - 2018-12-09 ## 0.1.0 - 2018-12-09
* Initial import * Initial import

View File

@@ -1,7 +1,7 @@
/// An implementation of [`poll_ready`]() that always signals readiness. /// An implementation of [`poll_ready`]() that always signals readiness.
/// ///
/// This should only be used for basic leaf services that have no concept of un-readiness. /// This should only be used for basic leaf services that have no concept of un-readiness.
/// For wrapper or other serivice types, use [`forward_ready!`] for simple cases or write a bespoke /// For wrapper or other service types, use [`forward_ready!`] for simple cases or write a bespoke
/// `poll_ready` implementation. /// `poll_ready` implementation.
/// ///
/// [`poll_ready`]: crate::Service::poll_ready /// [`poll_ready`]: crate::Service::poll_ready

View File

@@ -3,6 +3,22 @@
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
## 3.0.0-beta.9 - 2021-11-22
* Add configurable timeout for accepting TLS connection. [#393]
* Added `TlsError::Timeout` variant. [#393]
* All TLS acceptor services now use `TlsError` for their error types. [#393]
* Added `TlsError::into_service_error`. [#420]
[#393]: https://github.com/actix/actix-net/pull/393
[#420]: https://github.com/actix/actix-net/pull/420
## 3.0.0-beta.8 - 2021-11-15
* Add `Connect::request` for getting a reference to the connection request. [#415]
[#415]: https://github.com/actix/actix-net/pull/415
## 3.0.0-beta.7 - 2021-10-20 ## 3.0.0-beta.7 - 2021-10-20
* Add `webpki_roots_cert_store()` to get rustls compatible webpki roots cert store. [#401] * Add `webpki_roots_cert_store()` to get rustls compatible webpki roots cert store. [#401]
* Alias `connect::ssl` to `connect::tls`. [#401] * Alias `connect::ssl` to `connect::tls`. [#401]

View File

@@ -1,7 +1,10 @@
[package] [package]
name = "actix-tls" name = "actix-tls"
version = "3.0.0-beta.7" version = "3.0.0-beta.9"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"Rob Ede <robjtede@icloud.com>",
]
description = "TLS acceptor and connector services for Actix ecosystem" description = "TLS acceptor and connector services for Actix ecosystem"
keywords = ["network", "tls", "ssl", "async", "transport"] keywords = ["network", "tls", "ssl", "async", "transport"]
repository = "https://github.com/actix/actix-net.git" repository = "https://github.com/actix/actix-net.git"
@@ -47,6 +50,7 @@ derive_more = "0.99.5"
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
http = { version = "0.2.3", optional = true } http = { version = "0.2.3", optional = true }
log = "0.4" log = "0.4"
pin-project-lite = "0.2.7"
tokio-util = { version = "0.6.3", default-features = false } tokio-util = { version = "0.6.3", default-features = false }
# openssl # openssl
@@ -62,12 +66,14 @@ tokio-native-tls = { version = "0.3", optional = true }
[dev-dependencies] [dev-dependencies]
actix-rt = "2.2.0" actix-rt = "2.2.0"
actix-server = "2.0.0-beta.6" actix-server = "2.0.0-beta.9"
bytes = "1" bytes = "1"
env_logger = "0.9" env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
log = "0.4" log = "0.4"
rcgen = "0.8"
rustls-pemfile = "0.2.1" rustls-pemfile = "0.2.1"
tokio-rustls = { version = "0.23", features = ["dangerous_configuration"] }
trust-dns-resolver = "0.20.0" trust-dns-resolver = "0.20.0"
[[example]] [[example]]

View File

@@ -1,11 +1,9 @@
//! TLS acceptor services for Actix ecosystem. //! TLS acceptor services.
//!
//! ## Crate Features
//! * `openssl` - TLS acceptor using the `openssl` crate.
//! * `rustls` - TLS acceptor using the `rustls` crate.
//! * `native-tls` - TLS acceptor using the `native-tls` crate.
use std::sync::atomic::{AtomicUsize, Ordering}; use std::{
convert::Infallible,
sync::atomic::{AtomicUsize, Ordering},
};
use actix_utils::counter::Counter; use actix_utils::counter::Counter;
@@ -20,6 +18,10 @@ pub mod native_tls;
pub(crate) static MAX_CONN: AtomicUsize = AtomicUsize::new(256); pub(crate) static MAX_CONN: AtomicUsize = AtomicUsize::new(256);
#[cfg(any(feature = "openssl", feature = "rustls", feature = "native-tls"))]
pub(crate) const DEFAULT_TLS_HANDSHAKE_TIMEOUT: std::time::Duration =
std::time::Duration::from_secs(3);
thread_local! { thread_local! {
static MAX_CONN_COUNTER: Counter = Counter::new(MAX_CONN.load(Ordering::Relaxed)); static MAX_CONN_COUNTER: Counter = Counter::new(MAX_CONN.load(Ordering::Relaxed));
} }
@@ -36,7 +38,30 @@ pub fn max_concurrent_tls_connect(num: usize) {
/// TLS error combined with service error. /// TLS error combined with service error.
#[derive(Debug)] #[derive(Debug)]
pub enum TlsError<E1, E2> { pub enum TlsError<TlsErr, SvcErr> {
Tls(E1), Timeout,
Service(E2), Tls(TlsErr),
Service(SvcErr),
}
impl<TlsErr> TlsError<TlsErr, Infallible> {
/// Casts the infallible service error type returned from acceptors into caller's type.
pub fn into_service_error<SvcErr>(self) -> TlsError<TlsErr, SvcErr> {
match self {
Self::Timeout => TlsError::Timeout,
Self::Tls(err) => TlsError::Tls(err),
Self::Service(_) => unreachable!(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn tls_service_error_inference() {
let a: TlsError<u32, Infallible> = TlsError::Tls(42);
let _b: TlsError<u32, u64> = a.into_service_error();
}
} }

View File

@@ -1,20 +1,24 @@
use std::{ use std::{
convert::Infallible,
io::{self, IoSlice}, io::{self, IoSlice},
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
time::Duration,
}; };
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf}; use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
use actix_rt::net::{ActixStream, Ready}; use actix_rt::{
net::{ActixStream, Ready},
time::timeout,
};
use actix_service::{Service, ServiceFactory}; use actix_service::{Service, ServiceFactory};
use actix_utils::counter::Counter; use actix_utils::counter::Counter;
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
pub use tokio_native_tls::native_tls::Error; pub use tokio_native_tls::{native_tls::Error, TlsAcceptor};
pub use tokio_native_tls::TlsAcceptor;
use super::MAX_CONN_COUNTER; use super::{TlsError, DEFAULT_TLS_HANDSHAKE_TIMEOUT, MAX_CONN_COUNTER};
/// Wrapper type for `tokio_native_tls::TlsStream` in order to impl `ActixStream` trait. /// Wrapper type for `tokio_native_tls::TlsStream` in order to impl `ActixStream` trait.
pub struct TlsStream<T>(tokio_native_tls::TlsStream<T>); pub struct TlsStream<T>(tokio_native_tls::TlsStream<T>);
@@ -94,13 +98,25 @@ impl<T: ActixStream> ActixStream for TlsStream<T> {
/// `native-tls` feature enables this `Acceptor` type. /// `native-tls` feature enables this `Acceptor` type.
pub struct Acceptor { pub struct Acceptor {
acceptor: TlsAcceptor, acceptor: TlsAcceptor,
handshake_timeout: Duration,
} }
impl Acceptor { impl Acceptor {
/// Create `native-tls` based `Acceptor` service factory. /// Create `native-tls` based `Acceptor` service factory.
#[inline] #[inline]
pub fn new(acceptor: TlsAcceptor) -> Self { pub fn new(acceptor: TlsAcceptor) -> Self {
Acceptor { acceptor } Acceptor {
acceptor,
handshake_timeout: DEFAULT_TLS_HANDSHAKE_TIMEOUT,
}
}
/// Limit the amount of time that the acceptor will wait for a TLS handshake to complete.
///
/// Default timeout is 3 seconds.
pub fn set_handshake_timeout(&mut self, handshake_timeout: Duration) -> &mut Self {
self.handshake_timeout = handshake_timeout;
self
} }
} }
@@ -109,15 +125,15 @@ impl Clone for Acceptor {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
acceptor: self.acceptor.clone(), acceptor: self.acceptor.clone(),
handshake_timeout: self.handshake_timeout,
} }
} }
} }
impl<T: ActixStream + 'static> ServiceFactory<T> for Acceptor { impl<T: ActixStream + 'static> ServiceFactory<T> for Acceptor {
type Response = TlsStream<T>; type Response = TlsStream<T>;
type Error = Error; type Error = TlsError<Error, Infallible>;
type Config = (); type Config = ();
type Service = NativeTlsAcceptorService; type Service = NativeTlsAcceptorService;
type InitError = (); type InitError = ();
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>; type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
@@ -127,8 +143,10 @@ impl<T: ActixStream + 'static> ServiceFactory<T> for Acceptor {
Ok(NativeTlsAcceptorService { Ok(NativeTlsAcceptorService {
acceptor: self.acceptor.clone(), acceptor: self.acceptor.clone(),
conns: conns.clone(), conns: conns.clone(),
handshake_timeout: self.handshake_timeout,
}) })
}); });
Box::pin(async { res }) Box::pin(async { res })
} }
} }
@@ -136,12 +154,13 @@ impl<T: ActixStream + 'static> ServiceFactory<T> for Acceptor {
pub struct NativeTlsAcceptorService { pub struct NativeTlsAcceptorService {
acceptor: TlsAcceptor, acceptor: TlsAcceptor,
conns: Counter, conns: Counter,
handshake_timeout: Duration,
} }
impl<T: ActixStream + 'static> Service<T> for NativeTlsAcceptorService { impl<T: ActixStream + 'static> Service<T> for NativeTlsAcceptorService {
type Response = TlsStream<T>; type Response = TlsStream<T>;
type Error = Error; type Error = TlsError<Error, Infallible>;
type Future = LocalBoxFuture<'static, Result<TlsStream<T>, Error>>; type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.conns.available(cx) { if self.conns.available(cx) {
@@ -154,10 +173,18 @@ impl<T: ActixStream + 'static> Service<T> for NativeTlsAcceptorService {
fn call(&self, io: T) -> Self::Future { fn call(&self, io: T) -> Self::Future {
let guard = self.conns.get(); let guard = self.conns.get();
let acceptor = self.acceptor.clone(); let acceptor = self.acceptor.clone();
let dur = self.handshake_timeout;
Box::pin(async move { Box::pin(async move {
let io = acceptor.accept(io).await; match timeout(dur, acceptor.accept(io)).await {
Ok(Ok(io)) => {
drop(guard); drop(guard);
io.map(Into::into) Ok(TlsStream(io))
}
Ok(Err(err)) => Err(TlsError::Tls(err)),
Err(_timeout) => Err(TlsError::Timeout),
}
}) })
} }
} }

View File

@@ -1,22 +1,28 @@
use std::{ use std::{
convert::Infallible,
future::Future, future::Future,
io::{self, IoSlice}, io::{self, IoSlice},
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
time::Duration,
}; };
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf}; use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
use actix_rt::net::{ActixStream, Ready}; use actix_rt::{
net::{ActixStream, Ready},
time::{sleep, Sleep},
};
use actix_service::{Service, ServiceFactory}; use actix_service::{Service, ServiceFactory};
use actix_utils::counter::{Counter, CounterGuard}; use actix_utils::counter::{Counter, CounterGuard};
use futures_core::{future::LocalBoxFuture, ready}; use futures_core::future::LocalBoxFuture;
pub use openssl::ssl::{ pub use openssl::ssl::{
AlpnError, Error as SslError, HandshakeError, Ssl, SslAcceptor, SslAcceptorBuilder, AlpnError, Error as SslError, HandshakeError, Ssl, SslAcceptor, SslAcceptorBuilder,
}; };
use pin_project_lite::pin_project;
use super::MAX_CONN_COUNTER; use super::{TlsError, DEFAULT_TLS_HANDSHAKE_TIMEOUT, MAX_CONN_COUNTER};
/// Wrapper type for `tokio_openssl::SslStream` in order to impl `ActixStream` trait. /// Wrapper type for `tokio_openssl::SslStream` in order to impl `ActixStream` trait.
pub struct TlsStream<T>(tokio_openssl::SslStream<T>); pub struct TlsStream<T>(tokio_openssl::SslStream<T>);
@@ -96,13 +102,25 @@ impl<T: ActixStream> ActixStream for TlsStream<T> {
/// `openssl` feature enables this `Acceptor` type. /// `openssl` feature enables this `Acceptor` type.
pub struct Acceptor { pub struct Acceptor {
acceptor: SslAcceptor, acceptor: SslAcceptor,
handshake_timeout: Duration,
} }
impl Acceptor { impl Acceptor {
/// Create OpenSSL based `Acceptor` service factory. /// Create OpenSSL based `Acceptor` service factory.
#[inline] #[inline]
pub fn new(acceptor: SslAcceptor) -> Self { pub fn new(acceptor: SslAcceptor) -> Self {
Acceptor { acceptor } Acceptor {
acceptor,
handshake_timeout: DEFAULT_TLS_HANDSHAKE_TIMEOUT,
}
}
/// Limit the amount of time that the acceptor will wait for a TLS handshake to complete.
///
/// Default timeout is 3 seconds.
pub fn set_handshake_timeout(&mut self, handshake_timeout: Duration) -> &mut Self {
self.handshake_timeout = handshake_timeout;
self
} }
} }
@@ -111,13 +129,14 @@ impl Clone for Acceptor {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
acceptor: self.acceptor.clone(), acceptor: self.acceptor.clone(),
handshake_timeout: self.handshake_timeout,
} }
} }
} }
impl<T: ActixStream> ServiceFactory<T> for Acceptor { impl<T: ActixStream> ServiceFactory<T> for Acceptor {
type Response = TlsStream<T>; type Response = TlsStream<T>;
type Error = SslError; type Error = TlsError<SslError, Infallible>;
type Config = (); type Config = ();
type Service = AcceptorService; type Service = AcceptorService;
type InitError = (); type InitError = ();
@@ -128,8 +147,10 @@ impl<T: ActixStream> ServiceFactory<T> for Acceptor {
Ok(AcceptorService { Ok(AcceptorService {
acceptor: self.acceptor.clone(), acceptor: self.acceptor.clone(),
conns: conns.clone(), conns: conns.clone(),
handshake_timeout: self.handshake_timeout,
}) })
}); });
Box::pin(async { res }) Box::pin(async { res })
} }
} }
@@ -137,11 +158,12 @@ impl<T: ActixStream> ServiceFactory<T> for Acceptor {
pub struct AcceptorService { pub struct AcceptorService {
acceptor: SslAcceptor, acceptor: SslAcceptor,
conns: Counter, conns: Counter,
handshake_timeout: Duration,
} }
impl<T: ActixStream> Service<T> for AcceptorService { impl<T: ActixStream> Service<T> for AcceptorService {
type Response = TlsStream<T>; type Response = TlsStream<T>;
type Error = SslError; type Error = TlsError<SslError, Infallible>;
type Future = AcceptorServiceResponse<T>; type Future = AcceptorServiceResponse<T>;
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
@@ -155,27 +177,38 @@ impl<T: ActixStream> Service<T> for AcceptorService {
fn call(&self, io: T) -> Self::Future { fn call(&self, io: T) -> Self::Future {
let ssl_ctx = self.acceptor.context(); let ssl_ctx = self.acceptor.context();
let ssl = Ssl::new(ssl_ctx).expect("Provided SSL acceptor was invalid."); let ssl = Ssl::new(ssl_ctx).expect("Provided SSL acceptor was invalid.");
AcceptorServiceResponse { AcceptorServiceResponse {
_guard: self.conns.get(), _guard: self.conns.get(),
timeout: sleep(self.handshake_timeout),
stream: Some(tokio_openssl::SslStream::new(ssl, io).unwrap()), stream: Some(tokio_openssl::SslStream::new(ssl, io).unwrap()),
} }
} }
} }
pub struct AcceptorServiceResponse<T: ActixStream> { pin_project! {
pub struct AcceptorServiceResponse<T: ActixStream> {
stream: Option<tokio_openssl::SslStream<T>>, stream: Option<tokio_openssl::SslStream<T>>,
#[pin]
timeout: Sleep,
_guard: CounterGuard, _guard: CounterGuard,
}
} }
impl<T: ActixStream> Future for AcceptorServiceResponse<T> { impl<T: ActixStream> Future for AcceptorServiceResponse<T> {
type Output = Result<TlsStream<T>, SslError>; type Output = Result<TlsStream<T>, TlsError<SslError, Infallible>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(self.stream.as_mut().unwrap()).poll_accept(cx))?; let this = self.project();
Poll::Ready(Ok(self
match Pin::new(this.stream.as_mut().unwrap()).poll_accept(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(this
.stream .stream
.take() .take()
.expect("SSL connect has resolved.") .expect("Acceptor should not be polled after it has completed.")
.into())) .into())),
Poll::Ready(Err(err)) => Poll::Ready(Err(TlsError::Tls(err))),
Poll::Pending => this.timeout.poll(cx).map(|_| Err(TlsError::Timeout)),
}
} }
} }

View File

@@ -1,22 +1,28 @@
use std::{ use std::{
convert::Infallible,
future::Future, future::Future,
io::{self, IoSlice}, io::{self, IoSlice},
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
pin::Pin, pin::Pin,
sync::Arc, sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
time::Duration,
}; };
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf}; use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
use actix_rt::net::{ActixStream, Ready}; use actix_rt::{
net::{ActixStream, Ready},
time::{sleep, Sleep},
};
use actix_service::{Service, ServiceFactory}; use actix_service::{Service, ServiceFactory};
use actix_utils::counter::{Counter, CounterGuard}; use actix_utils::counter::{Counter, CounterGuard};
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
use pin_project_lite::pin_project;
use tokio_rustls::{Accept, TlsAcceptor}; use tokio_rustls::{Accept, TlsAcceptor};
pub use tokio_rustls::rustls::ServerConfig; pub use tokio_rustls::rustls::ServerConfig;
use super::MAX_CONN_COUNTER; use super::{TlsError, DEFAULT_TLS_HANDSHAKE_TIMEOUT, MAX_CONN_COUNTER};
/// Wrapper type for `tokio_openssl::SslStream` in order to impl `ActixStream` trait. /// Wrapper type for `tokio_openssl::SslStream` in order to impl `ActixStream` trait.
pub struct TlsStream<T>(tokio_rustls::server::TlsStream<T>); pub struct TlsStream<T>(tokio_rustls::server::TlsStream<T>);
@@ -96,6 +102,7 @@ impl<T: ActixStream> ActixStream for TlsStream<T> {
/// `rustls` feature enables this `Acceptor` type. /// `rustls` feature enables this `Acceptor` type.
pub struct Acceptor { pub struct Acceptor {
config: Arc<ServerConfig>, config: Arc<ServerConfig>,
handshake_timeout: Duration,
} }
impl Acceptor { impl Acceptor {
@@ -104,8 +111,17 @@ impl Acceptor {
pub fn new(config: ServerConfig) -> Self { pub fn new(config: ServerConfig) -> Self {
Acceptor { Acceptor {
config: Arc::new(config), config: Arc::new(config),
handshake_timeout: DEFAULT_TLS_HANDSHAKE_TIMEOUT,
} }
} }
/// Limit the amount of time that the acceptor will wait for a TLS handshake to complete.
///
/// Default timeout is 3 seconds.
pub fn set_handshake_timeout(&mut self, handshake_timeout: Duration) -> &mut Self {
self.handshake_timeout = handshake_timeout;
self
}
} }
impl Clone for Acceptor { impl Clone for Acceptor {
@@ -113,15 +129,15 @@ impl Clone for Acceptor {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
config: self.config.clone(), config: self.config.clone(),
handshake_timeout: self.handshake_timeout,
} }
} }
} }
impl<T: ActixStream> ServiceFactory<T> for Acceptor { impl<T: ActixStream> ServiceFactory<T> for Acceptor {
type Response = TlsStream<T>; type Response = TlsStream<T>;
type Error = io::Error; type Error = TlsError<io::Error, Infallible>;
type Config = (); type Config = ();
type Service = AcceptorService; type Service = AcceptorService;
type InitError = (); type InitError = ();
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>; type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
@@ -131,8 +147,10 @@ impl<T: ActixStream> ServiceFactory<T> for Acceptor {
Ok(AcceptorService { Ok(AcceptorService {
acceptor: self.config.clone().into(), acceptor: self.config.clone().into(),
conns: conns.clone(), conns: conns.clone(),
handshake_timeout: self.handshake_timeout,
}) })
}); });
Box::pin(async { res }) Box::pin(async { res })
} }
} }
@@ -141,11 +159,12 @@ impl<T: ActixStream> ServiceFactory<T> for Acceptor {
pub struct AcceptorService { pub struct AcceptorService {
acceptor: TlsAcceptor, acceptor: TlsAcceptor,
conns: Counter, conns: Counter,
handshake_timeout: Duration,
} }
impl<T: ActixStream> Service<T> for AcceptorService { impl<T: ActixStream> Service<T> for AcceptorService {
type Response = TlsStream<T>; type Response = TlsStream<T>;
type Error = io::Error; type Error = TlsError<io::Error, Infallible>;
type Future = AcceptorServiceFut<T>; type Future = AcceptorServiceFut<T>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
@@ -158,22 +177,31 @@ impl<T: ActixStream> Service<T> for AcceptorService {
fn call(&self, req: T) -> Self::Future { fn call(&self, req: T) -> Self::Future {
AcceptorServiceFut { AcceptorServiceFut {
_guard: self.conns.get(),
fut: self.acceptor.accept(req), fut: self.acceptor.accept(req),
timeout: sleep(self.handshake_timeout),
_guard: self.conns.get(),
} }
} }
} }
pub struct AcceptorServiceFut<T: ActixStream> { pin_project! {
pub struct AcceptorServiceFut<T: ActixStream> {
fut: Accept<T>, fut: Accept<T>,
#[pin]
timeout: Sleep,
_guard: CounterGuard, _guard: CounterGuard,
}
} }
impl<T: ActixStream> Future for AcceptorServiceFut<T> { impl<T: ActixStream> Future for AcceptorServiceFut<T> {
type Output = Result<TlsStream<T>, io::Error>; type Output = Result<TlsStream<T>, TlsError<io::Error, Infallible>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut(); let mut this = self.project();
Pin::new(&mut this.fut).poll(cx).map_ok(TlsStream) match Pin::new(&mut this.fut).poll(cx) {
Poll::Ready(Ok(stream)) => Poll::Ready(Ok(TlsStream(stream))),
Poll::Ready(Err(err)) => Poll::Ready(Err(TlsError::Tls(err))),
Poll::Pending => this.timeout.poll(cx).map(|_| Err(TlsError::Timeout)),
}
} }
} }

View File

@@ -63,16 +63,16 @@ impl From<Option<SocketAddr>> for ConnectAddrs {
/// Connection info. /// Connection info.
#[derive(Debug, PartialEq, Eq, Hash)] #[derive(Debug, PartialEq, Eq, Hash)]
pub struct Connect<T> { pub struct Connect<R> {
pub(crate) req: T, pub(crate) req: R,
pub(crate) port: u16, pub(crate) port: u16,
pub(crate) addr: ConnectAddrs, pub(crate) addr: ConnectAddrs,
pub(crate) local_addr: Option<IpAddr>, pub(crate) local_addr: Option<IpAddr>,
} }
impl<T: Address> Connect<T> { impl<R: Address> Connect<R> {
/// Create `Connect` instance by splitting the string by ':' and convert the second part to u16 /// Create `Connect` instance by splitting the string by ':' and convert the second part to u16
pub fn new(req: T) -> Connect<T> { pub fn new(req: R) -> Connect<R> {
let (_, port) = parse_host(req.hostname()); let (_, port) = parse_host(req.hostname());
Connect { Connect {
@@ -85,7 +85,7 @@ impl<T: Address> Connect<T> {
/// Create new `Connect` instance from host and address. Connector skips name resolution stage /// Create new `Connect` instance from host and address. Connector skips name resolution stage
/// for such connect messages. /// for such connect messages.
pub fn with_addr(req: T, addr: SocketAddr) -> Connect<T> { pub fn with_addr(req: R, addr: SocketAddr) -> Connect<R> {
Connect { Connect {
req, req,
port: 0, port: 0,
@@ -155,15 +155,20 @@ impl<T: Address> Connect<T> {
ConnectAddrs::Multi(addrs) => ConnectAddrsIter::MultiOwned(addrs.into_iter()), ConnectAddrs::Multi(addrs) => ConnectAddrsIter::MultiOwned(addrs.into_iter()),
} }
} }
/// Returns a reference to the connection request.
pub fn request(&self) -> &R {
&self.req
}
} }
impl<T: Address> From<T> for Connect<T> { impl<R: Address> From<R> for Connect<R> {
fn from(addr: T) -> Self { fn from(addr: R) -> Self {
Connect::new(addr) Connect::new(addr)
} }
} }
impl<T: Address> fmt::Display for Connect<T> { impl<R: Address> fmt::Display for Connect<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}:{}", self.hostname(), self.port()) write!(f, "{}:{}", self.hostname(), self.port())
} }
@@ -347,4 +352,10 @@ mod tests {
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)) IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
) )
} }
#[test]
fn request_ref() {
let conn = Connect::new("hello");
assert_eq!(conn.request(), &"hello")
}
} }

View File

@@ -115,7 +115,7 @@ where
} }
pub enum RustlsConnectorServiceFuture<T, U> { pub enum RustlsConnectorServiceFuture<T, U> {
/// See issue https://github.com/briansmith/webpki/issues/54 /// See issue <https://github.com/briansmith/webpki/issues/54>
InvalidDns, InvalidDns,
Future { Future {
connect: Connect<U>, connect: Connect<U>,

View File

@@ -0,0 +1,130 @@
//! Use Rustls connector to test OpenSSL acceptor.
#![cfg(all(
feature = "accept",
feature = "connect",
feature = "rustls",
feature = "openssl"
))]
use std::{convert::TryFrom, io::Write, sync::Arc};
use actix_rt::net::TcpStream;
use actix_server::TestServer;
use actix_service::ServiceFactoryExt as _;
use actix_tls::accept::openssl::{Acceptor, TlsStream};
use actix_utils::future::ok;
use tokio_rustls::rustls::{Certificate, ClientConfig, RootCertStore, ServerName};
fn new_cert_and_key() -> (String, String) {
let cert = rcgen::generate_simple_self_signed(vec![
"127.0.0.1".to_owned(),
"localhost".to_owned(),
])
.unwrap();
let key = cert.serialize_private_key_pem();
let cert = cert.serialize_pem().unwrap();
(cert, key)
}
fn openssl_acceptor(cert: String, key: String) -> tls_openssl::ssl::SslAcceptor {
use tls_openssl::{
pkey::PKey,
ssl::{SslAcceptor, SslMethod},
x509::X509,
};
let cert = X509::from_pem(cert.as_bytes()).unwrap();
let key = PKey::private_key_from_pem(key.as_bytes()).unwrap();
let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
builder.set_certificate(&cert).unwrap();
builder.set_private_key(&key).unwrap();
builder.set_alpn_select_callback(|_, _protocols| Ok(b"http/1.1"));
builder.set_alpn_protos(b"\x08http/1.1").unwrap();
builder.build()
}
#[allow(dead_code)]
mod danger {
use std::time::SystemTime;
use super::*;
use tokio_rustls::rustls::{
self,
client::{ServerCertVerified, ServerCertVerifier},
};
pub struct NoCertificateVerification;
impl ServerCertVerifier for NoCertificateVerification {
fn verify_server_cert(
&self,
_end_entity: &Certificate,
_intermediates: &[Certificate],
_server_name: &ServerName,
_scts: &mut dyn Iterator<Item = &[u8]>,
_ocsp_response: &[u8],
_now: SystemTime,
) -> Result<ServerCertVerified, rustls::Error> {
Ok(ServerCertVerified::assertion())
}
}
}
#[allow(dead_code)]
fn rustls_connector(_cert: String, _key: String) -> ClientConfig {
let mut config = ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(RootCertStore::empty())
.with_no_client_auth();
config
.dangerous()
.set_certificate_verifier(Arc::new(danger::NoCertificateVerification));
config.alpn_protocols = vec![b"http/1.1".to_vec()];
config
}
#[actix_rt::test]
async fn accepts_connections() {
let (cert, key) = new_cert_and_key();
let srv = TestServer::with({
let cert = cert.clone();
let key = key.clone();
move || {
let openssl_acceptor = openssl_acceptor(cert.clone(), key.clone());
let tls_acceptor = Acceptor::new(openssl_acceptor);
tls_acceptor
.map_err(|err| println!("OpenSSL error: {:?}", err))
.and_then(move |_stream: TlsStream<TcpStream>| ok(()))
}
});
let mut sock = srv
.connect()
.expect("cannot connect to test server")
.into_std()
.unwrap();
sock.set_nonblocking(false).unwrap();
let config = rustls_connector(cert, key);
let config = Arc::new(config);
let mut conn = tokio_rustls::rustls::ClientConnection::new(
config,
ServerName::try_from("localhost").unwrap(),
)
.unwrap();
let mut stream = tokio_rustls::rustls::Stream::new(&mut conn, &mut sock);
stream.flush().expect("TLS handshake failed");
}

View File

@@ -0,0 +1,104 @@
//! Use OpenSSL connector to test Rustls acceptor.
#![cfg(all(
feature = "accept",
feature = "connect",
feature = "rustls",
feature = "openssl"
))]
use std::io::{BufReader, Write};
use actix_rt::net::TcpStream;
use actix_server::TestServer;
use actix_service::ServiceFactoryExt as _;
use actix_tls::accept::rustls::{Acceptor, TlsStream};
use actix_tls::connect::tls::openssl::SslConnector;
use actix_utils::future::ok;
use rustls_pemfile::{certs, pkcs8_private_keys};
use tls_openssl::ssl::SslVerifyMode;
use tokio_rustls::rustls::{self, Certificate, PrivateKey, ServerConfig};
fn new_cert_and_key() -> (String, String) {
let cert = rcgen::generate_simple_self_signed(vec![
"127.0.0.1".to_owned(),
"localhost".to_owned(),
])
.unwrap();
let key = cert.serialize_private_key_pem();
let cert = cert.serialize_pem().unwrap();
(cert, key)
}
fn rustls_server_config(cert: String, key: String) -> rustls::ServerConfig {
// Load TLS key and cert files
let cert = &mut BufReader::new(cert.as_bytes());
let key = &mut BufReader::new(key.as_bytes());
let cert_chain = certs(cert).unwrap().into_iter().map(Certificate).collect();
let mut keys = pkcs8_private_keys(key).unwrap();
let mut config = ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(cert_chain, PrivateKey(keys.remove(0)))
.unwrap();
config.alpn_protocols = vec![b"http/1.1".to_vec()];
config
}
fn openssl_connector(cert: String, key: String) -> SslConnector {
use actix_tls::connect::tls::openssl::{SslConnector as OpensslConnector, SslMethod};
use tls_openssl::{pkey::PKey, x509::X509};
let cert = X509::from_pem(cert.as_bytes()).unwrap();
let key = PKey::private_key_from_pem(key.as_bytes()).unwrap();
let mut ssl = OpensslConnector::builder(SslMethod::tls()).unwrap();
ssl.set_verify(SslVerifyMode::NONE);
ssl.set_certificate(&cert).unwrap();
ssl.set_private_key(&key).unwrap();
ssl.set_alpn_protos(b"\x08http/1.1").unwrap();
ssl.build()
}
#[actix_rt::test]
async fn accepts_connections() {
let (cert, key) = new_cert_and_key();
let srv = TestServer::with({
let cert = cert.clone();
let key = key.clone();
move || {
let tls_acceptor = Acceptor::new(rustls_server_config(cert.clone(), key.clone()));
tls_acceptor
.map_err(|err| println!("Rustls error: {:?}", err))
.and_then(move |_stream: TlsStream<TcpStream>| ok(()))
}
});
let sock = srv
.connect()
.expect("cannot connect to test server")
.into_std()
.unwrap();
sock.set_nonblocking(false).unwrap();
let connector = openssl_connector(cert, key);
let mut stream = connector
.connect("localhost", sock)
.expect("TLS handshake failed");
stream.do_handshake().expect("TLS handshake failed");
stream.flush().expect("TLS handshake failed");
}

View File

@@ -23,3 +23,4 @@ local-waker = "0.1"
[dev-dependencies] [dev-dependencies]
actix-rt = "2.0.0" actix-rt = "2.0.0"
futures-util = { version = "0.3.7", default-features = false } futures-util = { version = "0.3.7", default-features = false }
static_assertions = "1.1"

View File

@@ -26,7 +26,7 @@ impl Counter {
CounterGuard::new(self.0.clone()) CounterGuard::new(self.0.clone())
} }
/// Notify current task and return true if counter is at capacity. /// Returns true if counter is below capacity. Otherwise, register to wake task when it is.
pub fn available(&self, cx: &mut task::Context<'_>) -> bool { pub fn available(&self, cx: &mut task::Context<'_>) -> bool {
self.0.available(cx) self.0.available(cx)
} }

View File

@@ -103,10 +103,16 @@ pub fn err<T, E>(err: E) -> Ready<Result<T, E>> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::rc::Rc;
use futures_util::task::noop_waker; use futures_util::task::noop_waker;
use static_assertions::{assert_impl_all, assert_not_impl_all};
use super::*; use super::*;
assert_impl_all!(Ready<()>: Send, Sync, Clone);
assert_not_impl_all!(Ready<Rc<()>>: Send, Sync);
#[test] #[test]
#[should_panic] #[should_panic]
fn multiple_poll_panics() { fn multiple_poll_panics() {