1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-08-13 04:58:21 +02:00

Compare commits

..

31 Commits

Author SHA1 Message Date
Rob Ede
f8f1ac94bc add Patterns::is_empty and impl IntoPatterns for Patterns 2021-07-20 08:18:50 +01:00
Rob Ede
82cd5b8290 prepare router release 0.5.0-beta.1 2021-07-20 07:43:50 +01:00
Rob Ede
c65e8524b2 rework resourcedef (#373) 2021-07-19 22:37:54 +01:00
Rob Ede
a83dfaa162 Update macros.rs
closes #234
2021-07-17 20:54:53 +01:00
Rob Ede
e4ec956001 fix examples on msrv 2021-07-17 03:11:25 +01:00
Rob Ede
95cba659ff add zero cost profiling to router 2021-07-17 01:09:29 +01:00
Rob Ede
5687e81d9f rework IntoPatterns trait and codegen (#372) 2021-07-17 01:06:23 +01:00
Rob Ede
a0fe2a9b2e clippy 2021-07-16 21:46:32 +01:00
Rob Ede
ad22a93466 allow path building when resource has tail (#371) 2021-07-16 21:41:57 +01:00
Rob Ede
c2d5b2398a Rename Path::{len => segment_count} (#370) 2021-07-16 19:43:48 +01:00
Ali MJ Al-Nasrawy
5b1ff30dd9 router: fix multi-pattern and path tail matches (#366) 2021-07-16 18:17:00 +01:00
Ali MJ Al-Nasrawy
e1317bb3a0 path.len() != path.path().len() (#368)
Co-authored-by: Rob Ede <robjtede@icloud.com>
2021-07-15 15:34:49 +01:00
Ali MJ Al-Nasrawy
dcea009158 ResourceDef: cleanup (#365) 2021-07-15 15:09:01 +01:00
Rob Ede
13c18b8a51 Update CHANGES.md 2021-07-14 10:37:49 +01:00
Rob Ede
06b17d6a43 fix ci 2021-06-28 15:06:29 +01:00
Rob Ede
605ec25143 prepare macros release 0.2.1 2021-06-08 17:48:30 +01:00
Ibraheem Ahmed
3824493fd3 take custom system path in actix_rt::main macro (#363) 2021-06-08 17:33:05 +01:00
Rob Ede
3be3e11aa5 change actix-router version to 0.4.0 2021-06-06 18:48:27 +01:00
Rob Ede
6a5ea0342b prepare router release 0.3.0 (#362) 2021-06-06 18:43:22 +01:00
Ali MJ Al-Nasrawy
23b1f63345 router: handle newline char '\n' in url (#360) 2021-06-06 03:38:58 +01:00
Ali MJ Al-Nasrawy
3aa037d07d fix changelog (#361) 2021-06-05 19:24:30 +01:00
Ali MJ Al-Nasrawy
cf21df14f2 Path: fix unsafe malformed string (#359) 2021-06-05 18:29:00 +01:00
Ali MJ Al-Nasrawy
a1bf8662c9 router: don't decode %25 to '%' (#357) 2021-06-06 01:34:16 +09:00
Ibraheem Ahmed
6f4d2220fa store Cow in actix-router Path (#345) 2021-06-05 01:46:40 +01:00
Danilo Bargen
54b22f9fce Docs: Fix signature of Service::call (#358) 2021-06-02 21:10:36 +01:00
fakeshadow
983abec77d Fix interrupt handling. Fix double server pause/resume (#353) 2021-04-30 13:42:25 +01:00
fakeshadow
e4d4ae21ee refactor connection counter (#343)
* Remove restart_worker test

* Remove Slab

* Rework counter

* Make counter limit switch accurate

* Remove backpressure. Add pause state

* make changes for review

* fix doc comment for counter
2021-04-29 23:27:08 +08:00
fakeshadow
8ad5f58d38 Remove ServerBuilder::configure (#349) 2021-04-27 23:58:02 +01:00
fakeshadow
613b2be51f Fix Display impl of MioListener (#350) 2021-04-27 11:54:18 -07:00
Rob Ede
b2e9640952 prepare codec 0.4.0 release (#346) 2021-04-21 11:08:43 +01:00
Rob Ede
76338a5822 prepare server release 2.0.0-beta.5 2021-04-20 05:16:32 +01:00
40 changed files with 2441 additions and 1522 deletions

View File

@@ -1,3 +1,3 @@
[alias]
chk = "hack check --workspace --all-features --tests --examples"
lint = "hack --clean-per-run clippy --workspace --tests --examples"
chk = "check --workspace --all-features --tests --examples --bins"
lint = "clippy --workspace --all-features --tests --examples --bins -- -Dclippy::todo"

View File

@@ -127,5 +127,5 @@ jobs:
- name: Clear the cargo caches
run: |
cargo install cargo-cache --no-default-features --features ci-autoclean
cargo install cargo-cache --version 0.6.2 --no-default-features --features ci-autoclean
cargo-cache

View File

@@ -39,4 +39,4 @@ jobs:
uses: actions-rs/clippy-check@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}
args: --workspace --tests --all-features
args: --workspace --all-features --tests --examples --bins -- -Dclippy::todo

View File

@@ -27,3 +27,8 @@ actix-utils = { path = "actix-utils" }
bytestring = { path = "bytestring" }
local-channel = { path = "local-channel" }
local-waker = { path = "local-waker" }
[profile.release]
lto = true
opt-level = 3
codegen-units = 1

View File

@@ -3,6 +3,10 @@
## Unreleased - 2021-xx-xx
## 0.4.0 - 2021-04-20
* No significant changes since v0.4.0-beta.1.
## 0.4.0-beta.1 - 2020-12-28
* Replace `pin-project` with `pin-project-lite`. [#237]
* Upgrade `tokio` dependency to `1`. [#237]
@@ -23,28 +27,28 @@
## 0.3.0-beta.1 - 2020-08-19
* Use `.advance()` instead of `.split_to()`.
* Upgrade `tokio-util` to `0.3`.
* Improve `BytesCodec` `.encode()` performance
* Simplify `BytesCodec` `.decode()`
* Improve `BytesCodec::encode()` performance.
* Simplify `BytesCodec::decode()`.
* Rename methods on `Framed` to better describe their use.
* Add method on `Framed` to get a pinned reference to the underlying I/O.
* Add method on `Framed` check emptiness of read buffer.
## 0.2.0 - 2019-12-10
* Use specific futures dependencies
* Use specific futures dependencies.
## 0.2.0-alpha.4
* Fix buffer remaining capacity calculation
* Fix buffer remaining capacity calculation.
## 0.2.0-alpha.3
* Use tokio 0.2
* Fix low/high watermark for write/read buffers
* Use tokio 0.2.
* Fix low/high watermark for write/read buffers.
## 0.2.0-alpha.2
* Migrated to `std::future`
* Migrated to `std::future`.
## 0.1.2 - 2019-03-27
@@ -56,4 +60,4 @@
## 0.1.0 - 2018-12-09
* Move codec to separate crate
* Move codec to separate crate.

View File

@@ -1,12 +1,10 @@
[package]
name = "actix-codec"
version = "0.4.0-beta.1"
version = "0.4.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Codec utilities for working with framed protocols"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-codec"
repository = "https://github.com/actix/actix-net"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2018"

View File

@@ -3,6 +3,12 @@
## Unreleased - 2021-xx-xx
## 0.2.1 - 2021-02-02
* Add optional argument `system` to `main` macro which can be used to specify the path to `actix_rt::System` (useful for re-exports). [#363]
[#363]: https://github.com/actix/actix-net/pull/363
## 0.2.0 - 2021-02-02
* Update to latest `actix_rt::System::new` signature. [#261]

View File

@@ -1,10 +1,12 @@
[package]
name = "actix-macros"
version = "0.2.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
version = "0.2.1"
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"Ibraheem Ahmed <ibrah1440@gmail.com>",
]
description = "Macros for Actix system and runtime"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-macros"
repository = "https://github.com/actix/actix-net"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2018"

View File

@@ -27,8 +27,10 @@ use quote::quote;
#[allow(clippy::needless_doctest_main)]
#[proc_macro_attribute]
#[cfg(not(test))] // Work around for rust-lang/rust#62127
pub fn main(_: TokenStream, item: TokenStream) -> TokenStream {
pub fn main(args: TokenStream, item: TokenStream) -> TokenStream {
let mut input = syn::parse_macro_input!(item as syn::ItemFn);
let args = syn::parse_macro_input!(args as syn::AttributeArgs);
let attrs = &input.attrs;
let vis = &input.vis;
let sig = &mut input.sig;
@@ -43,13 +45,47 @@ pub fn main(_: TokenStream, item: TokenStream) -> TokenStream {
.into();
}
let mut system = syn::parse_str::<syn::Path>("::actix_rt::System").unwrap();
for arg in &args {
match arg {
syn::NestedMeta::Meta(syn::Meta::NameValue(syn::MetaNameValue {
lit: syn::Lit::Str(lit),
path,
..
})) => match path
.get_ident()
.map(|i| i.to_string().to_lowercase())
.as_deref()
{
Some("system") => match lit.parse() {
Ok(path) => system = path,
Err(_) => {
return syn::Error::new_spanned(lit, "Expected path")
.to_compile_error()
.into();
}
},
_ => {
return syn::Error::new_spanned(arg, "Unknown attribute specified")
.to_compile_error()
.into();
}
},
_ => {
return syn::Error::new_spanned(arg, "Unknown attribute specified")
.to_compile_error()
.into();
}
}
}
sig.asyncness = None;
(quote! {
#(#attrs)*
#vis #sig {
actix_rt::System::new()
.block_on(async move { #body })
<#system>::new().block_on(async move { #body })
}
})
.into()

View File

@@ -4,6 +4,9 @@ fn compile_macros() {
t.pass("tests/trybuild/main-01-basic.rs");
t.compile_fail("tests/trybuild/main-02-only-async.rs");
t.pass("tests/trybuild/main-03-fn-params.rs");
t.pass("tests/trybuild/main-04-system-path.rs");
t.compile_fail("tests/trybuild/main-05-system-expect-path.rs");
t.compile_fail("tests/trybuild/main-06-unknown-attr.rs");
t.pass("tests/trybuild/test-01-basic.rs");
t.pass("tests/trybuild/test-02-keep-attrs.rs");

View File

@@ -0,0 +1,8 @@
mod system {
pub use actix_rt::System as MySystem;
}
#[actix_rt::main(system = "system::MySystem")]
async fn main() {
futures_util::future::ready(()).await
}

View File

@@ -0,0 +1,4 @@
#[actix_rt::main(system = "!@#*&")]
async fn main2() {}
fn main() {}

View File

@@ -0,0 +1,5 @@
error: Expected path
--> $DIR/main-05-system-expect-path.rs:1:27
|
1 | #[actix_rt::main(system = "!@#*&")]
| ^^^^^^^

View File

@@ -0,0 +1,7 @@
#[actix_rt::main(foo = "bar")]
async fn async_main() {}
#[actix_rt::main(bar::baz)]
async fn async_main2() {}
fn main() {}

View File

@@ -0,0 +1,11 @@
error: Unknown attribute specified
--> $DIR/main-06-unknown-attr.rs:1:18
|
1 | #[actix_rt::main(foo = "bar")]
| ^^^^^^^^^^^
error: Unknown attribute specified
--> $DIR/main-06-unknown-attr.rs:4:18
|
4 | #[actix_rt::main(bar::baz)]
| ^^^^^^^^

View File

@@ -3,6 +3,50 @@
## Unreleased - 2021-xx-xx
## 0.5.0-beta.1 - 2021-07-20
* Fix a bug in multi-patterns where static patterns are interpreted as regex. [#366]
* Introduce `ResourceDef::pattern_iter` to get an iterator over all patterns in a multi-pattern resource. [#373]
* Fix segment interpolation leaving `Path` in unintended state after matching. [#368]
* Fix `ResourceDef` `PartialEq` implementation. [#373]
* Re-work `IntoPatterns` trait, adding a `Patterns` enum. [#372]
* Implement `IntoPatterns` for `bytestring::ByteString`. [#372]
* Rename `Path::{len => segment_count}` to be more descriptive of it's purpose. [#370]
* Rename `ResourceDef::{resource_path => resource_path_from_iter}`. [#371]
* `ResourceDef::resource_path_from_iter` now takes an `IntoIterator`. [#373]
* Rename `ResourceDef::{resource_path_named => resource_path_from_map}`. [#371]
* Rename `ResourceDef::{is_prefix_match => find_match}`. [#373]
* Rename `ResourceDef::{match_path => capture_match_info}`. [#373]
* Rename `ResourceDef::{match_path_checked => capture_match_info_fn}`. [#373]
* Remove `ResourceDef::name_mut` and introduce `ResourceDef::set_name`. [#373]
* Rename `Router::{*_checked => *_fn}`. [#373]
* Return type of `ResourceDef::name` is now `Option<&str>`. [#373]
* Return type of `ResourceDef::pattern` is now `Option<&str>`. [#373]
[#368]: https://github.com/actix/actix-net/pull/368
[#366]: https://github.com/actix/actix-net/pull/366
[#368]: https://github.com/actix/actix-net/pull/368
[#370]: https://github.com/actix/actix-net/pull/370
[#371]: https://github.com/actix/actix-net/pull/371
[#372]: https://github.com/actix/actix-net/pull/372
[#373]: https://github.com/actix/actix-net/pull/373
## 0.4.0 - 2021-06-06
* When matching path parameters, `%25` is now kept in the percent-encoded form; no longer decoded to `%`. [#357]
* Path tail patterns now match new lines (`\n`) in request URL. [#360]
* Fixed a safety bug where `Path` could return a malformed string after percent decoding. [#359]
* Methods `Path::{add, add_static}` now take `impl Into<Cow<'static, str>>`. [#345]
[#345]: https://github.com/actix/actix-net/pull/345
[#357]: https://github.com/actix/actix-net/pull/357
[#359]: https://github.com/actix/actix-net/pull/359
[#360]: https://github.com/actix/actix-net/pull/360
## 0.3.0 - 2019-12-31
* Version was yanked previously. See https://crates.io/crates/actix-router/0.3.0
## 0.2.7 - 2021-02-06
* Add `Router::recognize_checked` [#247]

View File

@@ -1,12 +1,14 @@
[package]
name = "actix-router"
version = "0.2.7"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Resource path matching library"
version = "0.5.0-beta.1"
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"Ali MJ Al-Nasrawy <alimjalnasrawy@gmail.com>",
"Rob Ede <robjtede@icloud.com>",
]
description = "Resource path matching and router"
keywords = ["actix", "router", "routing"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-router"
license = "MIT OR Apache-2.0"
edition = "2018"
@@ -18,12 +20,19 @@ path = "src/lib.rs"
default = ["http"]
[dependencies]
regex = "1.3.1"
serde = "1.0.104"
bytestring = ">=0.1.5, <2"
log = "0.4.8"
http = { version = "0.2.2", optional = true }
firestorm = "0.4"
http = { version = "0.2.3", optional = true }
log = "0.4"
regex = "1.5"
serde = "1"
[dev-dependencies]
http = "0.2.2"
serde_derive = "1.0"
criterion = { version = "0.3", features = ["html_reports"] }
firestorm = { version = "0.4", features = ["enable_system_time"] }
http = "0.2.3"
serde = { version = "1", features = ["derive"] }
[[bench]]
name = "router"
harness = false

View File

@@ -0,0 +1,194 @@
//! Based on https://github.com/ibraheemdev/matchit/blob/master/benches/bench.rs
use criterion::{black_box, criterion_group, criterion_main, Criterion};
macro_rules! register {
(colon) => {{
register!(finish => ":p1", ":p2", ":p3", ":p4")
}};
(brackets) => {{
register!(finish => "{p1}", "{p2}", "{p3}", "{p4}")
}};
(regex) => {{
register!(finish => "(.*)", "(.*)", "(.*)", "(.*)")
}};
(finish => $p1:literal, $p2:literal, $p3:literal, $p4:literal) => {{
let arr = [
concat!("/authorizations"),
concat!("/authorizations/", $p1),
concat!("/applications/", $p1, "/tokens/", $p2),
concat!("/events"),
concat!("/repos/", $p1, "/", $p2, "/events"),
concat!("/networks/", $p1, "/", $p2, "/events"),
concat!("/orgs/", $p1, "/events"),
concat!("/users/", $p1, "/received_events"),
concat!("/users/", $p1, "/received_events/public"),
concat!("/users/", $p1, "/events"),
concat!("/users/", $p1, "/events/public"),
concat!("/users/", $p1, "/events/orgs/", $p2),
concat!("/feeds"),
concat!("/notifications"),
concat!("/repos/", $p1, "/", $p2, "/notifications"),
concat!("/notifications/threads/", $p1),
concat!("/notifications/threads/", $p1, "/subscription"),
concat!("/repos/", $p1, "/", $p2, "/stargazers"),
concat!("/users/", $p1, "/starred"),
concat!("/user/starred"),
concat!("/user/starred/", $p1, "/", $p2),
concat!("/repos/", $p1, "/", $p2, "/subscribers"),
concat!("/users/", $p1, "/subscriptions"),
concat!("/user/subscriptions"),
concat!("/repos/", $p1, "/", $p2, "/subscription"),
concat!("/user/subscriptions/", $p1, "/", $p2),
concat!("/users/", $p1, "/gists"),
concat!("/gists"),
concat!("/gists/", $p1),
concat!("/gists/", $p1, "/star"),
concat!("/repos/", $p1, "/", $p2, "/git/blobs/", $p3),
concat!("/repos/", $p1, "/", $p2, "/git/commits/", $p3),
concat!("/repos/", $p1, "/", $p2, "/git/refs"),
concat!("/repos/", $p1, "/", $p2, "/git/tags/", $p3),
concat!("/repos/", $p1, "/", $p2, "/git/trees/", $p3),
concat!("/issues"),
concat!("/user/issues"),
concat!("/orgs/", $p1, "/issues"),
concat!("/repos/", $p1, "/", $p2, "/issues"),
concat!("/repos/", $p1, "/", $p2, "/issues/", $p3),
concat!("/repos/", $p1, "/", $p2, "/assignees"),
concat!("/repos/", $p1, "/", $p2, "/assignees/", $p3),
concat!("/repos/", $p1, "/", $p2, "/issues/", $p3, "/comments"),
concat!("/repos/", $p1, "/", $p2, "/issues/", $p3, "/events"),
concat!("/repos/", $p1, "/", $p2, "/labels"),
concat!("/repos/", $p1, "/", $p2, "/labels/", $p3),
concat!("/repos/", $p1, "/", $p2, "/issues/", $p3, "/labels"),
concat!("/repos/", $p1, "/", $p2, "/milestones/", $p3, "/labels"),
concat!("/repos/", $p1, "/", $p2, "/milestones/"),
concat!("/repos/", $p1, "/", $p2, "/milestones/", $p3),
concat!("/emojis"),
concat!("/gitignore/templates"),
concat!("/gitignore/templates/", $p1),
concat!("/meta"),
concat!("/rate_limit"),
concat!("/users/", $p1, "/orgs"),
concat!("/user/orgs"),
concat!("/orgs/", $p1),
concat!("/orgs/", $p1, "/members"),
concat!("/orgs/", $p1, "/members", $p2),
concat!("/orgs/", $p1, "/public_members"),
concat!("/orgs/", $p1, "/public_members/", $p2),
concat!("/orgs/", $p1, "/teams"),
concat!("/teams/", $p1),
concat!("/teams/", $p1, "/members"),
concat!("/teams/", $p1, "/members", $p2),
concat!("/teams/", $p1, "/repos"),
concat!("/teams/", $p1, "/repos/", $p2, "/", $p3),
concat!("/user/teams"),
concat!("/repos/", $p1, "/", $p2, "/pulls"),
concat!("/repos/", $p1, "/", $p2, "/pulls/", $p3),
concat!("/repos/", $p1, "/", $p2, "/pulls/", $p3, "/commits"),
concat!("/repos/", $p1, "/", $p2, "/pulls/", $p3, "/files"),
concat!("/repos/", $p1, "/", $p2, "/pulls/", $p3, "/merge"),
concat!("/repos/", $p1, "/", $p2, "/pulls/", $p3, "/comments"),
concat!("/user/repos"),
concat!("/users/", $p1, "/repos"),
concat!("/orgs/", $p1, "/repos"),
concat!("/repositories"),
concat!("/repos/", $p1, "/", $p2),
concat!("/repos/", $p1, "/", $p2, "/contributors"),
concat!("/repos/", $p1, "/", $p2, "/languages"),
concat!("/repos/", $p1, "/", $p2, "/teams"),
concat!("/repos/", $p1, "/", $p2, "/tags"),
concat!("/repos/", $p1, "/", $p2, "/branches"),
concat!("/repos/", $p1, "/", $p2, "/branches/", $p3),
concat!("/repos/", $p1, "/", $p2, "/collaborators"),
concat!("/repos/", $p1, "/", $p2, "/collaborators/", $p3),
concat!("/repos/", $p1, "/", $p2, "/comments"),
concat!("/repos/", $p1, "/", $p2, "/commits/", $p3, "/comments"),
concat!("/repos/", $p1, "/", $p2, "/commits"),
concat!("/repos/", $p1, "/", $p2, "/commits/", $p3),
concat!("/repos/", $p1, "/", $p2, "/readme"),
concat!("/repos/", $p1, "/", $p2, "/keys"),
concat!("/repos/", $p1, "/", $p2, "/keys", $p3),
concat!("/repos/", $p1, "/", $p2, "/downloads"),
concat!("/repos/", $p1, "/", $p2, "/downloads", $p3),
concat!("/repos/", $p1, "/", $p2, "/forks"),
concat!("/repos/", $p1, "/", $p2, "/hooks"),
concat!("/repos/", $p1, "/", $p2, "/hooks", $p3),
concat!("/repos/", $p1, "/", $p2, "/releases"),
concat!("/repos/", $p1, "/", $p2, "/releases/", $p3),
concat!("/repos/", $p1, "/", $p2, "/releases/", $p3, "/assets"),
concat!("/repos/", $p1, "/", $p2, "/stats/contributors"),
concat!("/repos/", $p1, "/", $p2, "/stats/commit_activity"),
concat!("/repos/", $p1, "/", $p2, "/stats/code_frequency"),
concat!("/repos/", $p1, "/", $p2, "/stats/participation"),
concat!("/repos/", $p1, "/", $p2, "/stats/punch_card"),
concat!("/repos/", $p1, "/", $p2, "/statuses/", $p3),
concat!("/search/repositories"),
concat!("/search/code"),
concat!("/search/issues"),
concat!("/search/users"),
concat!("/legacy/issues/search/", $p1, "/", $p2, "/", $p3, "/", $p4),
concat!("/legacy/repos/search/", $p1),
concat!("/legacy/user/search/", $p1),
concat!("/legacy/user/email/", $p1),
concat!("/users/", $p1),
concat!("/user"),
concat!("/users"),
concat!("/user/emails"),
concat!("/users/", $p1, "/followers"),
concat!("/user/followers"),
concat!("/users/", $p1, "/following"),
concat!("/user/following"),
concat!("/user/following/", $p1),
concat!("/users/", $p1, "/following", $p2),
concat!("/users/", $p1, "/keys"),
concat!("/user/keys"),
concat!("/user/keys/", $p1),
];
std::array::IntoIter::new(arr)
}};
}
fn call() -> impl Iterator<Item = &'static str> {
let arr = [
"/authorizations",
"/user/repos",
"/repos/rust-lang/rust/stargazers",
"/orgs/rust-lang/public_members/nikomatsakis",
"/repos/rust-lang/rust/releases/1.51.0",
];
std::array::IntoIter::new(arr)
}
fn compare_routers(c: &mut Criterion) {
let mut group = c.benchmark_group("Compare Routers");
let mut actix = actix_router::Router::<bool>::build();
for route in register!(brackets) {
actix.path(route, true);
}
let actix = actix.finish();
group.bench_function("actix", |b| {
b.iter(|| {
for route in call() {
let mut path = actix_router::Path::new(route);
black_box(actix.recognize(&mut path).unwrap());
}
});
});
let regex_set = regex::RegexSet::new(register!(regex)).unwrap();
group.bench_function("regex", |b| {
b.iter(|| {
for route in call() {
black_box(regex_set.matches(route));
}
});
});
group.finish();
}
criterion_group!(benches, compare_routers);
criterion_main!(benches);

View File

@@ -0,0 +1,169 @@
macro_rules! register {
(brackets) => {{
register!(finish => "{p1}", "{p2}", "{p3}", "{p4}")
}};
(finish => $p1:literal, $p2:literal, $p3:literal, $p4:literal) => {{
let arr = [
concat!("/authorizations"),
concat!("/authorizations/", $p1),
concat!("/applications/", $p1, "/tokens/", $p2),
concat!("/events"),
concat!("/repos/", $p1, "/", $p2, "/events"),
concat!("/networks/", $p1, "/", $p2, "/events"),
concat!("/orgs/", $p1, "/events"),
concat!("/users/", $p1, "/received_events"),
concat!("/users/", $p1, "/received_events/public"),
concat!("/users/", $p1, "/events"),
concat!("/users/", $p1, "/events/public"),
concat!("/users/", $p1, "/events/orgs/", $p2),
concat!("/feeds"),
concat!("/notifications"),
concat!("/repos/", $p1, "/", $p2, "/notifications"),
concat!("/notifications/threads/", $p1),
concat!("/notifications/threads/", $p1, "/subscription"),
concat!("/repos/", $p1, "/", $p2, "/stargazers"),
concat!("/users/", $p1, "/starred"),
concat!("/user/starred"),
concat!("/user/starred/", $p1, "/", $p2),
concat!("/repos/", $p1, "/", $p2, "/subscribers"),
concat!("/users/", $p1, "/subscriptions"),
concat!("/user/subscriptions"),
concat!("/repos/", $p1, "/", $p2, "/subscription"),
concat!("/user/subscriptions/", $p1, "/", $p2),
concat!("/users/", $p1, "/gists"),
concat!("/gists"),
concat!("/gists/", $p1),
concat!("/gists/", $p1, "/star"),
concat!("/repos/", $p1, "/", $p2, "/git/blobs/", $p3),
concat!("/repos/", $p1, "/", $p2, "/git/commits/", $p3),
concat!("/repos/", $p1, "/", $p2, "/git/refs"),
concat!("/repos/", $p1, "/", $p2, "/git/tags/", $p3),
concat!("/repos/", $p1, "/", $p2, "/git/trees/", $p3),
concat!("/issues"),
concat!("/user/issues"),
concat!("/orgs/", $p1, "/issues"),
concat!("/repos/", $p1, "/", $p2, "/issues"),
concat!("/repos/", $p1, "/", $p2, "/issues/", $p3),
concat!("/repos/", $p1, "/", $p2, "/assignees"),
concat!("/repos/", $p1, "/", $p2, "/assignees/", $p3),
concat!("/repos/", $p1, "/", $p2, "/issues/", $p3, "/comments"),
concat!("/repos/", $p1, "/", $p2, "/issues/", $p3, "/events"),
concat!("/repos/", $p1, "/", $p2, "/labels"),
concat!("/repos/", $p1, "/", $p2, "/labels/", $p3),
concat!("/repos/", $p1, "/", $p2, "/issues/", $p3, "/labels"),
concat!("/repos/", $p1, "/", $p2, "/milestones/", $p3, "/labels"),
concat!("/repos/", $p1, "/", $p2, "/milestones/"),
concat!("/repos/", $p1, "/", $p2, "/milestones/", $p3),
concat!("/emojis"),
concat!("/gitignore/templates"),
concat!("/gitignore/templates/", $p1),
concat!("/meta"),
concat!("/rate_limit"),
concat!("/users/", $p1, "/orgs"),
concat!("/user/orgs"),
concat!("/orgs/", $p1),
concat!("/orgs/", $p1, "/members"),
concat!("/orgs/", $p1, "/members", $p2),
concat!("/orgs/", $p1, "/public_members"),
concat!("/orgs/", $p1, "/public_members/", $p2),
concat!("/orgs/", $p1, "/teams"),
concat!("/teams/", $p1),
concat!("/teams/", $p1, "/members"),
concat!("/teams/", $p1, "/members", $p2),
concat!("/teams/", $p1, "/repos"),
concat!("/teams/", $p1, "/repos/", $p2, "/", $p3),
concat!("/user/teams"),
concat!("/repos/", $p1, "/", $p2, "/pulls"),
concat!("/repos/", $p1, "/", $p2, "/pulls/", $p3),
concat!("/repos/", $p1, "/", $p2, "/pulls/", $p3, "/commits"),
concat!("/repos/", $p1, "/", $p2, "/pulls/", $p3, "/files"),
concat!("/repos/", $p1, "/", $p2, "/pulls/", $p3, "/merge"),
concat!("/repos/", $p1, "/", $p2, "/pulls/", $p3, "/comments"),
concat!("/user/repos"),
concat!("/users/", $p1, "/repos"),
concat!("/orgs/", $p1, "/repos"),
concat!("/repositories"),
concat!("/repos/", $p1, "/", $p2),
concat!("/repos/", $p1, "/", $p2, "/contributors"),
concat!("/repos/", $p1, "/", $p2, "/languages"),
concat!("/repos/", $p1, "/", $p2, "/teams"),
concat!("/repos/", $p1, "/", $p2, "/tags"),
concat!("/repos/", $p1, "/", $p2, "/branches"),
concat!("/repos/", $p1, "/", $p2, "/branches/", $p3),
concat!("/repos/", $p1, "/", $p2, "/collaborators"),
concat!("/repos/", $p1, "/", $p2, "/collaborators/", $p3),
concat!("/repos/", $p1, "/", $p2, "/comments"),
concat!("/repos/", $p1, "/", $p2, "/commits/", $p3, "/comments"),
concat!("/repos/", $p1, "/", $p2, "/commits"),
concat!("/repos/", $p1, "/", $p2, "/commits/", $p3),
concat!("/repos/", $p1, "/", $p2, "/readme"),
concat!("/repos/", $p1, "/", $p2, "/keys"),
concat!("/repos/", $p1, "/", $p2, "/keys", $p3),
concat!("/repos/", $p1, "/", $p2, "/downloads"),
concat!("/repos/", $p1, "/", $p2, "/downloads", $p3),
concat!("/repos/", $p1, "/", $p2, "/forks"),
concat!("/repos/", $p1, "/", $p2, "/hooks"),
concat!("/repos/", $p1, "/", $p2, "/hooks", $p3),
concat!("/repos/", $p1, "/", $p2, "/releases"),
concat!("/repos/", $p1, "/", $p2, "/releases/", $p3),
concat!("/repos/", $p1, "/", $p2, "/releases/", $p3, "/assets"),
concat!("/repos/", $p1, "/", $p2, "/stats/contributors"),
concat!("/repos/", $p1, "/", $p2, "/stats/commit_activity"),
concat!("/repos/", $p1, "/", $p2, "/stats/code_frequency"),
concat!("/repos/", $p1, "/", $p2, "/stats/participation"),
concat!("/repos/", $p1, "/", $p2, "/stats/punch_card"),
concat!("/repos/", $p1, "/", $p2, "/statuses/", $p3),
concat!("/search/repositories"),
concat!("/search/code"),
concat!("/search/issues"),
concat!("/search/users"),
concat!("/legacy/issues/search/", $p1, "/", $p2, "/", $p3, "/", $p4),
concat!("/legacy/repos/search/", $p1),
concat!("/legacy/user/search/", $p1),
concat!("/legacy/user/email/", $p1),
concat!("/users/", $p1),
concat!("/user"),
concat!("/users"),
concat!("/user/emails"),
concat!("/users/", $p1, "/followers"),
concat!("/user/followers"),
concat!("/users/", $p1, "/following"),
concat!("/user/following"),
concat!("/user/following/", $p1),
concat!("/users/", $p1, "/following", $p2),
concat!("/users/", $p1, "/keys"),
concat!("/user/keys"),
concat!("/user/keys/", $p1),
];
arr.to_vec()
}};
}
static PATHS: [&str; 5] = [
"/authorizations",
"/user/repos",
"/repos/rust-lang/rust/stargazers",
"/orgs/rust-lang/public_members/nikomatsakis",
"/repos/rust-lang/rust/releases/1.51.0",
];
fn main() {
let mut router = actix_router::Router::<bool>::build();
for route in register!(brackets) {
router.path(route, true);
}
let actix = router.finish();
if firestorm::enabled() {
firestorm::bench("target", || {
for &route in &PATHS {
let mut path = actix_router::Path::new(route);
actix.recognize(&mut path).unwrap();
}
})
.unwrap();
}
}

View File

@@ -24,10 +24,13 @@ macro_rules! parse_single_value {
where
V: Visitor<'de>,
{
if self.path.len() != 1 {
if self.path.segment_count() != 1 {
Err(de::value::Error::custom(
format!("wrong number of parameters: {} expected 1", self.path.len())
.as_str(),
format!(
"wrong number of parameters: {} expected 1",
self.path.segment_count()
)
.as_str(),
))
} else {
let v = self.path[0].parse().map_err(|_| {
@@ -110,11 +113,11 @@ impl<'de, T: ResourcePath + 'de> Deserializer<'de> for PathDeserializer<'de, T>
where
V: Visitor<'de>,
{
if self.path.len() < len {
if self.path.segment_count() < len {
Err(de::value::Error::custom(
format!(
"wrong number of parameters: {} expected {}",
self.path.len(),
self.path.segment_count(),
len
)
.as_str(),
@@ -135,11 +138,11 @@ impl<'de, T: ResourcePath + 'de> Deserializer<'de> for PathDeserializer<'de, T>
where
V: Visitor<'de>,
{
if self.path.len() < len {
if self.path.segment_count() < len {
Err(de::value::Error::custom(
format!(
"wrong number of parameters: {} expected {}",
self.path.len(),
self.path.segment_count(),
len
)
.as_str(),
@@ -173,9 +176,13 @@ impl<'de, T: ResourcePath + 'de> Deserializer<'de> for PathDeserializer<'de, T>
where
V: Visitor<'de>,
{
if self.path.len() != 1 {
if self.path.segment_count() != 1 {
Err(de::value::Error::custom(
format!("wrong number of parameters: {} expected 1", self.path.len()).as_str(),
format!(
"wrong number of parameters: {} expected 1",
self.path.segment_count()
)
.as_str(),
))
} else {
visitor.visit_str(&self.path[0])
@@ -485,8 +492,7 @@ impl<'de> de::VariantAccess<'de> for UnitVariant {
#[cfg(test)]
mod tests {
use serde::de;
use serde_derive::Deserialize;
use serde::{de, Deserialize};
use super::*;
use crate::path::Path;

View File

@@ -1,4 +1,4 @@
//! Resource path matching library.
//! Resource path matching and router.
#![deny(rust_2018_idioms, nonstandard_style)]
#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
@@ -14,6 +14,8 @@ pub use self::path::Path;
pub use self::resource::ResourceDef;
pub use self::router::{ResourceInfo, Router, RouterBuilder};
// TODO: this trait is necessary, document it
// see impl Resource for ServiceRequest
pub trait Resource<T: ResourcePath> {
fn resource_path(&mut self) -> &mut Path<T>;
}
@@ -40,98 +42,92 @@ impl ResourcePath for bytestring::ByteString {
}
}
/// Helper trait for type that could be converted to path pattern
pub trait IntoPattern {
fn is_single(&self) -> bool;
fn patterns(&self) -> Vec<String>;
/// One or many patterns.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum Patterns {
Single(String),
List(Vec<String>),
}
impl IntoPattern for String {
fn is_single(&self) -> bool {
true
}
fn patterns(&self) -> Vec<String> {
vec![self.clone()]
}
}
impl<'a> IntoPattern for &'a String {
fn is_single(&self) -> bool {
true
}
fn patterns(&self) -> Vec<String> {
vec![self.as_str().to_string()]
}
}
impl<'a> IntoPattern for &'a str {
fn is_single(&self) -> bool {
true
}
fn patterns(&self) -> Vec<String> {
vec![(*self).to_string()]
}
}
impl<T: AsRef<str>> IntoPattern for Vec<T> {
fn is_single(&self) -> bool {
self.len() == 1
}
fn patterns(&self) -> Vec<String> {
self.iter().map(|v| v.as_ref().to_string()).collect()
}
}
macro_rules! array_patterns (($tp:ty, $num:tt) => {
impl IntoPattern for [$tp; $num] {
fn is_single(&self) -> bool {
$num == 1
impl Patterns {
pub fn is_empty(&self) -> bool {
match self {
Patterns::Single(_) => false,
Patterns::List(pats) => pats.is_empty(),
}
}
}
fn patterns(&self) -> Vec<String> {
self.iter().map(|v| v.to_string()).collect()
/// Helper trait for type that could be converted to one or more path pattern.
pub trait IntoPatterns {
fn patterns(&self) -> Patterns;
}
impl IntoPatterns for String {
fn patterns(&self) -> Patterns {
Patterns::Single(self.clone())
}
}
impl<'a> IntoPatterns for &'a String {
fn patterns(&self) -> Patterns {
Patterns::Single((*self).clone())
}
}
impl<'a> IntoPatterns for &'a str {
fn patterns(&self) -> Patterns {
Patterns::Single((*self).to_owned())
}
}
impl IntoPatterns for bytestring::ByteString {
fn patterns(&self) -> Patterns {
Patterns::Single(self.to_string())
}
}
impl IntoPatterns for Patterns {
fn patterns(&self) -> Patterns {
self.clone()
}
}
impl<T: AsRef<str>> IntoPatterns for Vec<T> {
fn patterns(&self) -> Patterns {
let mut patterns = self.iter().map(|v| v.as_ref().to_owned());
match patterns.size_hint() {
(1, _) => Patterns::Single(patterns.next().unwrap()),
_ => Patterns::List(patterns.collect()),
}
}
}
macro_rules! array_patterns_single (($tp:ty) => {
impl IntoPatterns for [$tp; 1] {
fn patterns(&self) -> Patterns {
Patterns::Single(self[0].to_owned())
}
}
});
array_patterns!(&str, 1);
array_patterns!(&str, 2);
array_patterns!(&str, 3);
array_patterns!(&str, 4);
array_patterns!(&str, 5);
array_patterns!(&str, 6);
array_patterns!(&str, 7);
array_patterns!(&str, 8);
array_patterns!(&str, 9);
array_patterns!(&str, 10);
array_patterns!(&str, 11);
array_patterns!(&str, 12);
array_patterns!(&str, 13);
array_patterns!(&str, 14);
array_patterns!(&str, 15);
array_patterns!(&str, 16);
macro_rules! array_patterns_multiple (($tp:ty, $str_fn:expr, $($num:tt) +) => {
// for each array length specified in $num
$(
impl IntoPatterns for [$tp; $num] {
fn patterns(&self) -> Patterns {
Patterns::List(self.iter().map($str_fn).collect())
}
}
)+
});
array_patterns!(String, 1);
array_patterns!(String, 2);
array_patterns!(String, 3);
array_patterns!(String, 4);
array_patterns!(String, 5);
array_patterns!(String, 6);
array_patterns!(String, 7);
array_patterns!(String, 8);
array_patterns!(String, 9);
array_patterns!(String, 10);
array_patterns!(String, 11);
array_patterns!(String, 12);
array_patterns!(String, 13);
array_patterns!(String, 14);
array_patterns!(String, 15);
array_patterns!(String, 16);
array_patterns_single!(&str);
array_patterns_multiple!(&str, |&v| v.to_owned(), 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16);
array_patterns_single!(String);
array_patterns_multiple!(String, |v| v.clone(), 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16);
#[cfg(feature = "http")]
mod url;
@@ -140,10 +136,11 @@ mod url;
pub use self::url::{Quoter, Url};
#[cfg(feature = "http")]
mod http_support {
use super::ResourcePath;
mod http_impls {
use http::Uri;
use super::ResourcePath;
impl ResourcePath for Uri {
fn path(&self) -> &str {
self.path()

View File

@@ -1,44 +1,31 @@
use std::borrow::Cow;
use std::ops::Index;
use firestorm::profile_method;
use serde::de;
use crate::de::PathDeserializer;
use crate::{Resource, ResourcePath};
use crate::{de::PathDeserializer, Resource, ResourcePath};
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone)]
pub(crate) enum PathItem {
Static(&'static str),
Static(Cow<'static, str>),
Segment(u16, u16),
}
/// Resource path match information
impl Default for PathItem {
fn default() -> Self {
Self::Static(Cow::Borrowed(""))
}
}
/// Resource path match information.
///
/// If resource path contains variable patterns, `Path` stores them.
#[derive(Debug)]
#[derive(Debug, Clone, Default)]
pub struct Path<T> {
path: T,
pub(crate) skip: u16,
pub(crate) segments: Vec<(&'static str, PathItem)>,
}
impl<T: Default> Default for Path<T> {
fn default() -> Self {
Path {
path: T::default(),
skip: 0,
segments: Vec::new(),
}
}
}
impl<T: Clone> Clone for Path<T> {
fn clone(&self) -> Self {
Path {
path: self.path.clone(),
skip: self.skip,
segments: self.segments.clone(),
}
}
pub(crate) segments: Vec<(Cow<'static, str>, PathItem)>,
}
impl<T: ResourcePath> Path<T> {
@@ -50,21 +37,23 @@ impl<T: ResourcePath> Path<T> {
}
}
/// Get reference to inner path instance
/// Get reference to inner path instance.
#[inline]
pub fn get_ref(&self) -> &T {
&self.path
}
/// Get mutable reference to inner path instance
/// Get mutable reference to inner path instance.
#[inline]
pub fn get_mut(&mut self) -> &mut T {
&mut self.path
}
/// Path
/// Path.
#[inline]
pub fn path(&self) -> &str {
profile_method!(path);
let skip = self.skip as usize;
let path = self.path.path();
if skip <= path.len() {
@@ -74,7 +63,7 @@ impl<T: ResourcePath> Path<T> {
}
}
/// Set new path
/// Set new path.
#[inline]
pub fn set(&mut self, path: T) {
self.skip = 0;
@@ -82,62 +71,69 @@ impl<T: ResourcePath> Path<T> {
self.segments.clear();
}
/// Reset state
/// Reset state.
#[inline]
pub fn reset(&mut self) {
self.skip = 0;
self.segments.clear();
}
/// Skip first `n` chars in path
/// Skip first `n` chars in path.
#[inline]
pub fn skip(&mut self, n: u16) {
self.skip += n;
}
pub(crate) fn add(&mut self, name: &'static str, value: PathItem) {
pub(crate) fn add(&mut self, name: impl Into<Cow<'static, str>>, value: PathItem) {
profile_method!(add);
match value {
PathItem::Static(s) => self.segments.push((name, PathItem::Static(s))),
PathItem::Segment(begin, end) => self
.segments
.push((name, PathItem::Segment(self.skip + begin, self.skip + end))),
PathItem::Static(s) => self.segments.push((name.into(), PathItem::Static(s))),
PathItem::Segment(begin, end) => self.segments.push((
name.into(),
PathItem::Segment(self.skip + begin, self.skip + end),
)),
}
}
#[doc(hidden)]
pub fn add_static(&mut self, name: &'static str, value: &'static str) {
self.segments.push((name, PathItem::Static(value)));
pub fn add_static(
&mut self,
name: impl Into<Cow<'static, str>>,
value: impl Into<Cow<'static, str>>,
) {
self.segments
.push((name.into(), PathItem::Static(value.into())));
}
/// Check if there are any matched patterns
/// Check if there are any matched patterns.
#[inline]
pub fn is_empty(&self) -> bool {
self.segments.is_empty()
}
/// Check number of extracted parameters
/// Returns number of interpolated segments.
#[inline]
pub fn len(&self) -> usize {
pub fn segment_count(&self) -> usize {
self.segments.len()
}
/// Get matched parameter by name without type conversion
pub fn get(&self, key: &str) -> Option<&str> {
for item in self.segments.iter() {
if key == item.0 {
return match item.1 {
pub fn get(&self, name: &str) -> Option<&str> {
profile_method!(get);
for (seg_name, val) in self.segments.iter() {
if name == seg_name {
return match val {
PathItem::Static(ref s) => Some(&s),
PathItem::Segment(s, e) => {
Some(&self.path.path()[(s as usize)..(e as usize)])
Some(&self.path.path()[(*s as usize)..(*e as usize)])
}
};
}
}
if key == "tail" {
Some(&self.path.path()[(self.skip as usize)..])
} else {
None
}
None
}
/// Get unprocessed part of the path
@@ -147,9 +143,10 @@ impl<T: ResourcePath> Path<T> {
/// Get matched parameter by name.
///
/// If keyed parameter is not available empty string is used as default
/// value.
/// If keyed parameter is not available empty string is used as default value.
pub fn query(&self, key: &str) -> &str {
profile_method!(query);
if let Some(s) = self.get(key) {
s
} else {
@@ -157,7 +154,7 @@ impl<T: ResourcePath> Path<T> {
}
}
/// Return iterator to items in parameter container
/// Return iterator to items in parameter container.
pub fn iter(&self) -> PathIter<'_, T> {
PathIter {
idx: 0,
@@ -167,6 +164,7 @@ impl<T: ResourcePath> Path<T> {
/// Try to deserialize matching parameters to a specified type `U`
pub fn load<'de, U: serde::Deserialize<'de>>(&'de self) -> Result<U, de::value::Error> {
profile_method!(load);
de::Deserialize::deserialize(PathDeserializer::new(self))
}
}
@@ -182,7 +180,7 @@ impl<'a, T: ResourcePath> Iterator for PathIter<'a, T> {
#[inline]
fn next(&mut self) -> Option<(&'a str, &'a str)> {
if self.idx < self.params.len() {
if self.idx < self.params.segment_count() {
let idx = self.idx;
let res = match self.params.segments[idx].1 {
PathItem::Static(ref s) => &s,

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,6 @@
use crate::{IntoPattern, Resource, ResourceDef, ResourcePath};
use firestorm::profile_method;
use crate::{IntoPatterns, Resource, ResourceDef, ResourcePath};
#[derive(Debug, Copy, Clone, PartialEq)]
pub struct ResourceId(pub u16);
@@ -10,7 +12,11 @@ pub struct ResourceInfo {
}
/// Resource router.
pub struct Router<T, U = ()>(Vec<(ResourceDef, T, Option<U>)>);
// T is the resource itself
// U is any other data needed for routing like method guards
pub struct Router<T, U = ()> {
routes: Vec<(ResourceDef, T, Option<U>)>,
}
impl<T, U> Router<T, U> {
pub fn build() -> RouterBuilder<T, U> {
@@ -24,11 +30,14 @@ impl<T, U> Router<T, U> {
R: Resource<P>,
P: ResourcePath,
{
for item in self.0.iter() {
if item.0.match_path(resource.resource_path()) {
profile_method!(recognize);
for item in self.routes.iter() {
if item.0.capture_match_info(resource.resource_path()) {
return Some((&item.1, ResourceId(item.0.id())));
}
}
None
}
@@ -37,33 +46,35 @@ impl<T, U> Router<T, U> {
R: Resource<P>,
P: ResourcePath,
{
for item in self.0.iter_mut() {
if item.0.match_path(resource.resource_path()) {
profile_method!(recognize_mut);
for item in self.routes.iter_mut() {
if item.0.capture_match_info(resource.resource_path()) {
return Some((&mut item.1, ResourceId(item.0.id())));
}
}
None
}
pub fn recognize_checked<R, P, F>(
&self,
resource: &mut R,
check: F,
) -> Option<(&T, ResourceId)>
pub fn recognize_fn<R, P, F>(&self, resource: &mut R, check: F) -> Option<(&T, ResourceId)>
where
F: Fn(&R, &Option<U>) -> bool,
R: Resource<P>,
P: ResourcePath,
{
for item in self.0.iter() {
if item.0.match_path_checked(resource, &check, &item.2) {
profile_method!(recognize_checked);
for item in self.routes.iter() {
if item.0.capture_match_info_fn(resource, &check, &item.2) {
return Some((&item.1, ResourceId(item.0.id())));
}
}
None
}
pub fn recognize_mut_checked<R, P, F>(
pub fn recognize_mut_fn<R, P, F>(
&mut self,
resource: &mut R,
check: F,
@@ -73,11 +84,14 @@ impl<T, U> Router<T, U> {
R: Resource<P>,
P: ResourcePath,
{
for item in self.0.iter_mut() {
if item.0.match_path_checked(resource, &check, &item.2) {
profile_method!(recognize_mut_checked);
for item in self.routes.iter_mut() {
if item.0.capture_match_info_fn(resource, &check, &item.2) {
return Some((&mut item.1, ResourceId(item.0.id())));
}
}
None
}
}
@@ -88,11 +102,13 @@ pub struct RouterBuilder<T, U = ()> {
impl<T, U> RouterBuilder<T, U> {
/// Register resource for specified path.
pub fn path<P: IntoPattern>(
pub fn path<P: IntoPatterns>(
&mut self,
path: P,
resource: T,
) -> &mut (ResourceDef, T, Option<U>) {
profile_method!(path);
self.resources
.push((ResourceDef::new(path), resource, None));
self.resources.last_mut().unwrap()
@@ -100,6 +116,8 @@ impl<T, U> RouterBuilder<T, U> {
/// Register resource for specified path prefix.
pub fn prefix(&mut self, prefix: &str, resource: T) -> &mut (ResourceDef, T, Option<U>) {
profile_method!(prefix);
self.resources
.push((ResourceDef::prefix(prefix), resource, None));
self.resources.last_mut().unwrap()
@@ -107,13 +125,17 @@ impl<T, U> RouterBuilder<T, U> {
/// Register resource for ResourceDef
pub fn rdef(&mut self, rdef: ResourceDef, resource: T) -> &mut (ResourceDef, T, Option<U>) {
profile_method!(rdef);
self.resources.push((rdef, resource, None));
self.resources.last_mut().unwrap()
}
/// Finish configuration and create router instance.
pub fn finish(self) -> Router<T, U> {
Router(self.resources)
Router {
routes: self.resources,
}
}
}

View File

@@ -31,7 +31,7 @@ fn set_bit(array: &mut [u8], ch: u8) {
}
thread_local! {
static DEFAULT_QUOTER: Quoter = Quoter::new(b"@:", b"/+");
static DEFAULT_QUOTER: Quoter = Quoter::new(b"@:", b"%/+");
}
#[derive(Default, Clone, Debug)]
@@ -170,11 +170,7 @@ impl Quoter {
idx += 1;
}
cloned.map(|data| {
// SAFETY: we get data from http::Uri, which does UTF-8 checks already
// this code only decodes valid pct encoded values
unsafe { String::from_utf8_unchecked(data) }
})
cloned.map(|data| String::from_utf8_lossy(&data).into_owned())
}
}
@@ -204,24 +200,69 @@ mod tests {
use super::*;
use crate::{Path, ResourceDef};
const PROTECTED: &[u8] = b"%/+";
fn match_url(pattern: &'static str, url: impl AsRef<str>) -> Path<Url> {
let re = ResourceDef::new(pattern);
let uri = Uri::try_from(url.as_ref()).unwrap();
let mut path = Path::new(Url::new(uri));
assert!(re.capture_match_info(&mut path));
path
}
fn percent_encode(data: &[u8]) -> String {
data.iter().map(|c| format!("%{:02X}", c)).collect()
}
#[test]
fn test_parse_url() {
let re = ResourceDef::new("/user/{id}/test");
let re = "/user/{id}/test";
let url = Uri::try_from("/user/2345/test").unwrap();
let mut path = Path::new(Url::new(url));
assert!(re.match_path(&mut path));
let path = match_url(re, "/user/2345/test");
assert_eq!(path.get("id").unwrap(), "2345");
let url = Uri::try_from("/user/qwe%25/test").unwrap();
let mut path = Path::new(Url::new(url));
assert!(re.match_path(&mut path));
assert_eq!(path.get("id").unwrap(), "qwe%");
// "%25" should never be decoded into '%' to guarantee the output is a valid
// percent-encoded format
let path = match_url(re, "/user/qwe%25/test");
assert_eq!(path.get("id").unwrap(), "qwe%25");
let url = Uri::try_from("/user/qwe%25rty/test").unwrap();
let mut path = Path::new(Url::new(url));
assert!(re.match_path(&mut path));
assert_eq!(path.get("id").unwrap(), "qwe%rty");
let path = match_url(re, "/user/qwe%25rty/test");
assert_eq!(path.get("id").unwrap(), "qwe%25rty");
}
#[test]
fn test_protected_chars() {
let encoded = percent_encode(PROTECTED);
let path = match_url("/user/{id}/test", format!("/user/{}/test", encoded));
assert_eq!(path.get("id").unwrap(), &encoded);
}
#[test]
fn test_non_protecteed_ascii() {
let nonprotected_ascii = ('\u{0}'..='\u{7F}')
.filter(|&c| c.is_ascii() && !PROTECTED.contains(&(c as u8)))
.collect::<String>();
let encoded = percent_encode(nonprotected_ascii.as_bytes());
let path = match_url("/user/{id}/test", format!("/user/{}/test", encoded));
assert_eq!(path.get("id").unwrap(), &nonprotected_ascii);
}
#[test]
fn test_valid_utf8_multibyte() {
let test = ('\u{FF00}'..='\u{FFFF}').collect::<String>();
let encoded = percent_encode(test.as_bytes());
let path = match_url("/a/{id}/b", format!("/a/{}/b", &encoded));
assert_eq!(path.get("id").unwrap(), &test);
}
#[test]
fn test_invalid_utf8() {
let invalid_utf8 = percent_encode((0x80..=0xff).collect::<Vec<_>>().as_slice());
let uri = Uri::try_from(format!("/{}", invalid_utf8)).unwrap();
let path = Path::new(Url::new(uri));
// We should always get a valid utf8 string
assert!(String::from_utf8(path.path().as_bytes().to_owned()).is_ok());
}
#[test]

View File

@@ -8,7 +8,7 @@
`Ready` object in ok variant. [#293]
* Breakage is acceptable since `ActixStream` was not intended to be public.
[#293] https://github.com/actix/actix-net/pull/293
[#293]: https://github.com/actix/actix-net/pull/293
## 2.1.0 - 2021-02-24

View File

@@ -286,7 +286,7 @@ fn new_arbiter_with_tokio() {
arb.join().unwrap();
assert_eq!(false, counter.load(Ordering::SeqCst));
assert!(!counter.load(Ordering::SeqCst));
}
#[test]

View File

@@ -1,6 +1,13 @@
# Changes
## Unreleased - 2021-xx-xx
* Remove `config` module. `ServiceConfig`, `ServiceRuntime` public types are removed due to this change. [#349]
* Remove `ServerBuilder::configure` [#349]
[#349]: https://github.com/actix/actix-net/pull/349
## 2.0.0-beta.5 - 2021-04-20
* Server shutdown would notify all workers to exit regardless if shutdown is graceful.
This would make all worker shutdown immediately in force shutdown case. [#333]

View File

@@ -1,14 +1,13 @@
[package]
name = "actix-server"
version = "2.0.0-beta.4"
version = "2.0.0-beta.5"
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"fakeshadow <24548779@qq.com>",
]
description = "General purpose TCP server built for the Actix ecosystem"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
repository = "https://github.com/actix/actix-net"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2018"
@@ -29,7 +28,6 @@ futures-core = { version = "0.3.7", default-features = false, features = ["alloc
log = "0.4"
mio = { version = "0.7.6", features = ["os-poll", "net"] }
num_cpus = "1.13"
slab = "0.4"
tokio = { version = "1.2", features = ["sync"] }
[dev-dependencies]

View File

@@ -7,21 +7,14 @@ use actix_rt::{
};
use log::{error, info};
use mio::{Interest, Poll, Token as MioToken};
use slab::Slab;
use crate::server::Server;
use crate::socket::{MioListener, SocketAddr};
use crate::socket::MioListener;
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
use crate::worker::{Conn, WorkerHandleAccept};
use crate::Token;
struct ServerSocketInfo {
/// Address of socket. Mainly used for logging.
addr: SocketAddr,
/// Beware this is the crate token for identify socket and should not be confused
/// with `mio::Token`.
token: Token,
token: usize,
lst: MioListener,
@@ -65,7 +58,7 @@ impl AcceptLoop {
pub(crate) fn start(
&mut self,
socks: Vec<(Token, MioListener)>,
socks: Vec<(usize, MioListener)>,
handles: Vec<WorkerHandleAccept>,
) {
let srv = self.srv.take().expect("Can not re-use AcceptInfo");
@@ -84,7 +77,7 @@ struct Accept {
srv: Server,
next: usize,
avail: Availability,
backpressure: bool,
paused: bool,
}
/// Array of u128 with every bit as marker for a worker handle's availability.
@@ -98,23 +91,22 @@ impl Default for Availability {
impl Availability {
/// Check if any worker handle is available
#[inline(always)]
fn available(&self) -> bool {
self.0.iter().any(|a| *a != 0)
}
/// Check if worker handle is available by index
#[inline(always)]
fn get_available(&self, idx: usize) -> bool {
let (offset, idx) = Self::offset(idx);
self.0[offset] & (1 << idx as u128) != 0
}
/// Set worker handle available state by index.
fn set_available(&mut self, idx: usize, avail: bool) {
let (offset, idx) = if idx < 128 {
(0, idx)
} else if idx < 128 * 2 {
(1, idx - 128)
} else if idx < 128 * 3 {
(2, idx - 128 * 2)
} else if idx < 128 * 4 {
(3, idx - 128 * 3)
} else {
panic!("Max WorkerHandle count is 512")
};
let (offset, idx) = Self::offset(idx);
let off = 1 << idx as u128;
if avail {
@@ -131,6 +123,21 @@ impl Availability {
self.set_available(handle.idx(), true);
})
}
/// Get offset and adjusted index of given worker handle index.
fn offset(idx: usize) -> (usize, usize) {
if idx < 128 {
(0, idx)
} else if idx < 128 * 2 {
(1, idx - 128)
} else if idx < 128 * 3 {
(2, idx - 128 * 2)
} else if idx < 128 * 4 {
(3, idx - 128 * 3)
} else {
panic!("Max WorkerHandle count is 512")
}
}
}
/// This function defines errors that are per-connection. Which basically
@@ -150,7 +157,7 @@ impl Accept {
pub(crate) fn start(
poll: Poll,
waker: WakerQueue,
socks: Vec<(Token, MioListener)>,
socks: Vec<(usize, MioListener)>,
srv: Server,
handles: Vec<WorkerHandleAccept>,
) {
@@ -161,10 +168,10 @@ impl Accept {
.name("actix-server accept loop".to_owned())
.spawn(move || {
System::set_current(sys);
let (mut accept, sockets) =
let (mut accept, mut sockets) =
Accept::new_with_sockets(poll, waker, socks, handles, srv);
accept.poll_with(sockets);
accept.poll_with(&mut sockets);
})
.unwrap();
}
@@ -172,29 +179,25 @@ impl Accept {
fn new_with_sockets(
poll: Poll,
waker: WakerQueue,
socks: Vec<(Token, MioListener)>,
socks: Vec<(usize, MioListener)>,
handles: Vec<WorkerHandleAccept>,
srv: Server,
) -> (Accept, Slab<ServerSocketInfo>) {
let mut sockets = Slab::new();
for (hnd_token, mut lst) in socks.into_iter() {
let addr = lst.local_addr();
) -> (Accept, Vec<ServerSocketInfo>) {
let sockets = socks
.into_iter()
.map(|(token, mut lst)| {
// Start listening for incoming connections
poll.registry()
.register(&mut lst, MioToken(token), Interest::READABLE)
.unwrap_or_else(|e| panic!("Can not register io: {}", e));
let entry = sockets.vacant_entry();
let token = entry.key();
// Start listening for incoming connections
poll.registry()
.register(&mut lst, MioToken(token), Interest::READABLE)
.unwrap_or_else(|e| panic!("Can not register io: {}", e));
entry.insert(ServerSocketInfo {
addr,
token: hnd_token,
lst,
timeout: None,
});
}
ServerSocketInfo {
token,
lst,
timeout: None,
}
})
.collect();
let mut avail = Availability::default();
@@ -208,19 +211,19 @@ impl Accept {
srv,
next: 0,
avail,
backpressure: false,
paused: false,
};
(accept, sockets)
}
fn poll_with(&mut self, mut sockets: Slab<ServerSocketInfo>) {
fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) {
let mut events = mio::Events::with_capacity(128);
loop {
if let Err(e) = self.poll.poll(&mut events, None) {
match e.kind() {
std::io::ErrorKind::Interrupted => continue,
io::ErrorKind::Interrupted => {}
_ => panic!("Poll error: {}", e),
}
}
@@ -228,130 +231,160 @@ impl Accept {
for event in events.iter() {
let token = event.token();
match token {
// This is a loop because interests for command from previous version was
// a loop that would try to drain the command channel. It's yet unknown
// if it's necessary/good practice to actively drain the waker queue.
WAKER_TOKEN => 'waker: loop {
// take guard with every iteration so no new interest can be added
// until the current task is done.
let mut guard = self.waker.guard();
match guard.pop_front() {
// worker notify it becomes available. we may want to recover
// from backpressure.
Some(WakerInterest::WorkerAvailable(idx)) => {
drop(guard);
self.maybe_backpressure(&mut sockets, false);
self.avail.set_available(idx, true);
}
// a new worker thread is made and it's handle would be added to Accept
Some(WakerInterest::Worker(handle)) => {
drop(guard);
// maybe we want to recover from a backpressure.
self.maybe_backpressure(&mut sockets, false);
self.avail.set_available(handle.idx(), true);
self.handles.push(handle);
}
// got timer interest and it's time to try register socket(s) again
Some(WakerInterest::Timer) => {
drop(guard);
self.process_timer(&mut sockets)
}
Some(WakerInterest::Pause) => {
drop(guard);
self.deregister_all(&mut sockets);
}
Some(WakerInterest::Resume) => {
drop(guard);
sockets.iter_mut().for_each(|(token, info)| {
self.register_logged(token, info);
});
}
Some(WakerInterest::Stop) => {
return self.deregister_all(&mut sockets);
}
// waker queue is drained
None => {
// Reset the WakerQueue before break so it does not grow infinitely
WakerQueue::reset(&mut guard);
break 'waker;
}
WAKER_TOKEN => {
let exit = self.handle_waker(sockets);
if exit {
info!("Accept is stopped.");
return;
}
},
}
_ => {
let token = usize::from(token);
self.accept(&mut sockets, token);
self.accept(sockets, token);
}
}
}
}
}
fn process_timer(&self, sockets: &mut Slab<ServerSocketInfo>) {
fn handle_waker(&mut self, sockets: &mut [ServerSocketInfo]) -> bool {
// This is a loop because interests for command from previous version was
// a loop that would try to drain the command channel. It's yet unknown
// if it's necessary/good practice to actively drain the waker queue.
loop {
// take guard with every iteration so no new interest can be added
// until the current task is done.
let mut guard = self.waker.guard();
match guard.pop_front() {
// worker notify it becomes available.
Some(WakerInterest::WorkerAvailable(idx)) => {
drop(guard);
self.avail.set_available(idx, true);
if !self.paused {
self.accept_all(sockets);
}
}
// a new worker thread is made and it's handle would be added to Accept
Some(WakerInterest::Worker(handle)) => {
drop(guard);
self.avail.set_available(handle.idx(), true);
self.handles.push(handle);
if !self.paused {
self.accept_all(sockets);
}
}
// got timer interest and it's time to try register socket(s) again
Some(WakerInterest::Timer) => {
drop(guard);
self.process_timer(sockets)
}
Some(WakerInterest::Pause) => {
drop(guard);
if !self.paused {
self.paused = true;
self.deregister_all(sockets);
}
}
Some(WakerInterest::Resume) => {
drop(guard);
if self.paused {
self.paused = false;
sockets.iter_mut().for_each(|info| {
self.register_logged(info);
});
self.accept_all(sockets);
}
}
Some(WakerInterest::Stop) => {
if !self.paused {
self.deregister_all(sockets);
}
return true;
}
// waker queue is drained
None => {
// Reset the WakerQueue before break so it does not grow infinitely
WakerQueue::reset(&mut guard);
return false;
}
}
}
}
fn process_timer(&self, sockets: &mut [ServerSocketInfo]) {
let now = Instant::now();
sockets
.iter_mut()
// Only sockets that had an associated timeout were deregistered.
.filter(|(_, info)| info.timeout.is_some())
.for_each(|(token, info)| {
.filter(|info| info.timeout.is_some())
.for_each(|info| {
let inst = info.timeout.take().unwrap();
if now < inst {
info.timeout = Some(inst);
} else if !self.backpressure {
self.register_logged(token, info);
} else if !self.paused {
self.register_logged(info);
}
// Drop the timeout if server is in backpressure and socket timeout is expired.
// When server recovers from backpressure it will register all sockets without
// Drop the timeout if server is paused and socket timeout is expired.
// When server recovers from pause it will register all sockets without
// a timeout value so this socket register will be delayed till then.
});
}
#[cfg(not(target_os = "windows"))]
fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> {
fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
let token = MioToken(info.token);
self.poll
.registry()
.register(&mut info.lst, MioToken(token), Interest::READABLE)
.register(&mut info.lst, token, Interest::READABLE)
}
#[cfg(target_os = "windows")]
fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> {
fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
// On windows, calling register without deregister cause an error.
// See https://github.com/actix/actix-web/issues/905
// Calling reregister seems to fix the issue.
let token = MioToken(info.token);
self.poll
.registry()
.register(&mut info.lst, mio::Token(token), Interest::READABLE)
.register(&mut info.lst, token, Interest::READABLE)
.or_else(|_| {
self.poll.registry().reregister(
&mut info.lst,
mio::Token(token),
Interest::READABLE,
)
self.poll
.registry()
.reregister(&mut info.lst, token, Interest::READABLE)
})
}
fn register_logged(&self, token: usize, info: &mut ServerSocketInfo) {
match self.register(token, info) {
Ok(_) => info!("Resume accepting connections on {}", info.addr),
fn register_logged(&self, info: &mut ServerSocketInfo) {
match self.register(info) {
Ok(_) => info!("Resume accepting connections on {}", info.lst.local_addr()),
Err(e) => error!("Can not register server socket {}", e),
}
}
fn deregister(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
self.poll.registry().deregister(&mut info.lst)
}
fn deregister_logged(&self, info: &mut ServerSocketInfo) {
match self.deregister(info) {
Ok(_) => info!("Paused accepting connections on {}", info.addr),
match self.poll.registry().deregister(&mut info.lst) {
Ok(_) => info!("Paused accepting connections on {}", info.lst.local_addr()),
Err(e) => {
error!("Can not deregister server socket {}", e)
}
}
}
fn deregister_all(&self, sockets: &mut Slab<ServerSocketInfo>) {
fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) {
// This is a best effort implementation with following limitation:
//
// Every ServerSocketInfo with associate timeout will be skipped and it's timeout
@@ -364,70 +397,23 @@ impl Accept {
.iter_mut()
// Take all timeout.
// This is to prevent Accept::process_timer method re-register a socket afterwards.
.map(|(_, info)| (info.timeout.take(), info))
.map(|info| (info.timeout.take(), info))
// Socket info with a timeout is already deregistered so skip them.
.filter(|(timeout, _)| timeout.is_none())
.for_each(|(_, info)| self.deregister_logged(info));
}
fn maybe_backpressure(&mut self, sockets: &mut Slab<ServerSocketInfo>, on: bool) {
// Only operate when server is in a different backpressure than the given flag.
if self.backpressure != on {
self.backpressure = on;
sockets
.iter_mut()
// Only operate on sockets without associated timeout.
// Sockets with it should be handled by `accept` and `process_timer` methods.
// They are already deregistered or need to be reregister in the future.
.filter(|(_, info)| info.timeout.is_none())
.for_each(|(token, info)| {
if on {
self.deregister_logged(info);
} else {
self.register_logged(token, info);
}
});
}
}
fn accept_one(&mut self, sockets: &mut Slab<ServerSocketInfo>, mut conn: Conn) {
if self.backpressure {
// send_connection would remove fault worker from handles.
// worst case here is conn get dropped after all handles are gone.
while let Err(c) = self.send_connection(sockets, conn) {
conn = c
}
} else {
while self.avail.available() {
let next = self.next();
let idx = next.idx();
if next.available() {
self.avail.set_available(idx, true);
match self.send_connection(sockets, conn) {
Ok(_) => return,
Err(c) => conn = c,
}
} else {
self.avail.set_available(idx, false);
self.set_next();
}
}
// Sending Conn failed due to either all workers are in error or not available.
// Enter backpressure state and try again.
self.maybe_backpressure(sockets, true);
self.accept_one(sockets, conn);
}
}
// Send connection to worker and handle error.
fn send_connection(
&mut self,
sockets: &mut Slab<ServerSocketInfo>,
conn: Conn,
) -> Result<(), Conn> {
match self.next().send(conn) {
fn send_connection(&mut self, conn: Conn) -> Result<(), Conn> {
let next = self.next();
match next.send(conn) {
Ok(_) => {
// Increment counter of WorkerHandle.
// Set worker to unavailable with it hit max (Return false).
if !next.inc_counter() {
let idx = next.idx();
self.avail.set_available(idx, false);
}
self.set_next();
Ok(())
}
@@ -438,7 +424,6 @@ impl Accept {
if self.handles.is_empty() {
error!("No workers");
self.maybe_backpressure(sockets, true);
// All workers are gone and Conn is nowhere to be sent.
// Treat this situation as Ok and drop Conn.
return Ok(());
@@ -451,19 +436,38 @@ impl Accept {
}
}
fn accept(&mut self, sockets: &mut Slab<ServerSocketInfo>, token: usize) {
fn accept_one(&mut self, mut conn: Conn) {
loop {
let info = sockets
.get_mut(token)
.expect("ServerSocketInfo is removed from Slab");
let next = self.next();
let idx = next.idx();
if self.avail.get_available(idx) {
match self.send_connection(conn) {
Ok(_) => return,
Err(c) => conn = c,
}
} else {
self.avail.set_available(idx, false);
self.set_next();
if !self.avail.available() {
while let Err(c) = self.send_connection(conn) {
conn = c;
}
return;
}
}
}
}
fn accept(&mut self, sockets: &mut [ServerSocketInfo], token: usize) {
while self.avail.available() {
let info = &mut sockets[token];
match info.lst.accept() {
Ok(io) => {
let msg = Conn {
io,
token: info.token,
};
self.accept_one(sockets, msg);
let conn = Conn { io, token };
self.accept_one(conn);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
Err(ref e) if connection_error(e) => continue,
@@ -491,11 +495,22 @@ impl Accept {
}
}
fn accept_all(&mut self, sockets: &mut [ServerSocketInfo]) {
sockets
.iter_mut()
.map(|info| info.token)
.collect::<Vec<_>>()
.into_iter()
.for_each(|idx| self.accept(sockets, idx))
}
#[inline(always)]
fn next(&self) -> &WorkerHandleAccept {
&self.handles[self.next]
}
/// Set next worker handle that would accept connection.
#[inline(always)]
fn set_next(&mut self) {
self.next = (self.next + 1) % self.handles.len();
}

View File

@@ -8,31 +8,29 @@ use std::{
use actix_rt::{self as rt, net::TcpStream, time::sleep, System};
use log::{error, info};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tokio::sync::oneshot;
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver},
oneshot,
};
use crate::accept::AcceptLoop;
use crate::config::{ConfiguredService, ServiceConfig};
use crate::join_all;
use crate::server::{Server, ServerCommand};
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
use crate::signals::{Signal, Signals};
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
use crate::socket::{MioTcpListener, MioTcpSocket};
use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::worker::{
ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandleAccept,
WorkerHandleServer,
};
use crate::{join_all, Token};
use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer};
/// Server builder
pub struct ServerBuilder {
threads: usize,
token: Token,
token: usize,
backlog: u32,
handles: Vec<(usize, WorkerHandleServer)>,
services: Vec<Box<dyn InternalServiceFactory>>,
sockets: Vec<(Token, String, MioListener)>,
sockets: Vec<(usize, String, MioListener)>,
accept: AcceptLoop,
exit: bool,
no_signals: bool,
@@ -56,7 +54,7 @@ impl ServerBuilder {
ServerBuilder {
threads: num_cpus::get(),
token: Token::default(),
token: 0,
handles: Vec::new(),
services: Vec::new(),
sockets: Vec::new(),
@@ -149,32 +147,6 @@ impl ServerBuilder {
self
}
/// Execute external configuration as part of the server building process.
///
/// This function is useful for moving parts of configuration to a different module or
/// even library.
pub fn configure<F>(mut self, f: F) -> io::Result<ServerBuilder>
where
F: Fn(&mut ServiceConfig) -> io::Result<()>,
{
let mut cfg = ServiceConfig::new(self.threads, self.backlog);
f(&mut cfg)?;
if let Some(apply) = cfg.apply {
let mut srv = ConfiguredService::new(apply);
for (name, lst) in cfg.services {
let token = self.token.next();
srv.stream(token, name.clone(), lst.local_addr()?);
self.sockets.push((token, name, MioListener::Tcp(lst)));
}
self.services.push(Box::new(srv));
}
self.threads = cfg.threads;
Ok(self)
}
/// Add new service to the server.
pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
where
@@ -184,7 +156,7 @@ impl ServerBuilder {
let sockets = bind_addr(addr, self.backlog)?;
for lst in sockets {
let token = self.token.next();
let token = self.next_token();
self.services.push(StreamNewService::create(
name.as_ref().to_string(),
token,
@@ -233,7 +205,7 @@ impl ServerBuilder {
{
use std::net::{IpAddr, Ipv4Addr};
lst.set_nonblocking(true)?;
let token = self.token.next();
let token = self.next_token();
let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
self.services.push(StreamNewService::create(
name.as_ref().to_string(),
@@ -259,7 +231,7 @@ impl ServerBuilder {
lst.set_nonblocking(true)?;
let addr = lst.local_addr()?;
let token = self.token.next();
let token = self.next_token();
self.services.push(StreamNewService::create(
name.as_ref().to_string(),
token,
@@ -318,12 +290,11 @@ impl ServerBuilder {
fn start_worker(
&self,
idx: usize,
waker: WakerQueue,
waker_queue: WakerQueue,
) -> (WorkerHandleAccept, WorkerHandleServer) {
let avail = WorkerAvailability::new(idx, waker);
let services = self.services.iter().map(|v| v.clone_factory()).collect();
ServerWorker::start(idx, services, avail, self.worker_config)
ServerWorker::start(idx, services, waker_queue, self.worker_config)
}
fn handle_cmd(&mut self, item: ServerCommand) {
@@ -437,6 +408,12 @@ impl ServerBuilder {
}
}
}
fn next_token(&mut self) -> usize {
let token = self.token;
self.token += 1;
token
}
}
impl Future for ServerBuilder {

View File

@@ -1,287 +0,0 @@
use std::collections::HashMap;
use std::future::Future;
use std::{fmt, io};
use actix_rt::net::TcpStream;
use actix_service::{
fn_service, IntoServiceFactory as IntoBaseServiceFactory,
ServiceFactory as BaseServiceFactory,
};
use actix_utils::{counter::CounterGuard, future::ready};
use futures_core::future::LocalBoxFuture;
use log::error;
use crate::builder::bind_addr;
use crate::service::{BoxedServerService, InternalServiceFactory, StreamService};
use crate::socket::{MioStream, MioTcpListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
use crate::Token;
pub struct ServiceConfig {
pub(crate) services: Vec<(String, MioTcpListener)>,
pub(crate) apply: Option<Box<dyn ServiceRuntimeConfiguration>>,
pub(crate) threads: usize,
pub(crate) backlog: u32,
}
impl ServiceConfig {
pub(super) fn new(threads: usize, backlog: u32) -> ServiceConfig {
ServiceConfig {
threads,
backlog,
services: Vec::new(),
apply: None,
}
}
/// Set number of workers to start.
///
/// By default server uses number of available logical cpu as workers
/// count.
pub fn workers(&mut self, num: usize) {
self.threads = num;
}
/// Add new service to server
pub fn bind<U, N: AsRef<str>>(&mut self, name: N, addr: U) -> io::Result<&mut Self>
where
U: ToSocketAddrs,
{
let sockets = bind_addr(addr, self.backlog)?;
for lst in sockets {
self._listen(name.as_ref(), lst);
}
Ok(self)
}
/// Add new service to server
pub fn listen<N: AsRef<str>>(&mut self, name: N, lst: StdTcpListener) -> &mut Self {
self._listen(name, MioTcpListener::from_std(lst))
}
/// Register service configuration function. This function get called
/// during worker runtime configuration. It get executed in worker thread.
pub fn apply<F>(&mut self, f: F) -> io::Result<()>
where
F: Fn(&mut ServiceRuntime) + Send + Clone + 'static,
{
self.apply = Some(Box::new(f));
Ok(())
}
fn _listen<N: AsRef<str>>(&mut self, name: N, lst: MioTcpListener) -> &mut Self {
if self.apply.is_none() {
self.apply = Some(Box::new(not_configured));
}
self.services.push((name.as_ref().to_string(), lst));
self
}
}
pub(super) struct ConfiguredService {
rt: Box<dyn ServiceRuntimeConfiguration>,
names: HashMap<Token, (String, StdSocketAddr)>,
topics: HashMap<String, Token>,
services: Vec<Token>,
}
impl ConfiguredService {
pub(super) fn new(rt: Box<dyn ServiceRuntimeConfiguration>) -> Self {
ConfiguredService {
rt,
names: HashMap::new(),
topics: HashMap::new(),
services: Vec::new(),
}
}
pub(super) fn stream(&mut self, token: Token, name: String, addr: StdSocketAddr) {
self.names.insert(token, (name.clone(), addr));
self.topics.insert(name, token);
self.services.push(token);
}
}
impl InternalServiceFactory for ConfiguredService {
fn name(&self, token: Token) -> &str {
&self.names[&token].0
}
fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
Box::new(Self {
rt: self.rt.clone(),
names: self.names.clone(),
topics: self.topics.clone(),
services: self.services.clone(),
})
}
fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
// configure services
let mut rt = ServiceRuntime::new(self.topics.clone());
self.rt.configure(&mut rt);
rt.validate();
let mut names = self.names.clone();
let tokens = self.services.clone();
// construct services
Box::pin(async move {
let mut services = rt.services;
// TODO: Proper error handling here
for f in rt.onstart.into_iter() {
f.await;
}
let mut res = vec![];
for token in tokens {
if let Some(srv) = services.remove(&token) {
let newserv = srv.new_service(());
match newserv.await {
Ok(serv) => {
res.push((token, serv));
}
Err(_) => {
error!("Can not construct service");
return Err(());
}
}
} else {
let name = names.remove(&token).unwrap().0;
res.push((
token,
Box::new(StreamService::new(fn_service(move |_: TcpStream| {
error!("Service {:?} is not configured", name);
ready::<Result<_, ()>>(Ok(()))
}))),
));
};
}
Ok(res)
})
}
}
pub(super) trait ServiceRuntimeConfiguration: Send {
fn clone(&self) -> Box<dyn ServiceRuntimeConfiguration>;
fn configure(&self, rt: &mut ServiceRuntime);
}
impl<F> ServiceRuntimeConfiguration for F
where
F: Fn(&mut ServiceRuntime) + Send + Clone + 'static,
{
fn clone(&self) -> Box<dyn ServiceRuntimeConfiguration> {
Box::new(self.clone())
}
fn configure(&self, rt: &mut ServiceRuntime) {
(self)(rt)
}
}
fn not_configured(_: &mut ServiceRuntime) {
error!("Service is not configured");
}
pub struct ServiceRuntime {
names: HashMap<String, Token>,
services: HashMap<Token, BoxedNewService>,
onstart: Vec<LocalBoxFuture<'static, ()>>,
}
impl ServiceRuntime {
fn new(names: HashMap<String, Token>) -> Self {
ServiceRuntime {
names,
services: HashMap::new(),
onstart: Vec::new(),
}
}
fn validate(&self) {
for (name, token) in &self.names {
if !self.services.contains_key(&token) {
error!("Service {:?} is not configured", name);
}
}
}
/// Register service.
///
/// Name of the service must be registered during configuration stage with
/// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods.
pub fn service<T, F>(&mut self, name: &str, service: F)
where
F: IntoBaseServiceFactory<T, TcpStream>,
T: BaseServiceFactory<TcpStream, Config = ()> + 'static,
T::Future: 'static,
T::Service: 'static,
T::InitError: fmt::Debug,
{
// let name = name.to_owned();
if let Some(token) = self.names.get(name) {
self.services.insert(
*token,
Box::new(ServiceFactory {
inner: service.into_factory(),
}),
);
} else {
panic!("Unknown service: {:?}", name);
}
}
/// Execute future before services initialization.
pub fn on_start<F>(&mut self, fut: F)
where
F: Future<Output = ()> + 'static,
{
self.onstart.push(Box::pin(fut))
}
}
type BoxedNewService = Box<
dyn BaseServiceFactory<
(CounterGuard, MioStream),
Response = (),
Error = (),
InitError = (),
Config = (),
Service = BoxedServerService,
Future = LocalBoxFuture<'static, Result<BoxedServerService, ()>>,
>,
>;
struct ServiceFactory<T> {
inner: T,
}
impl<T> BaseServiceFactory<(CounterGuard, MioStream)> for ServiceFactory<T>
where
T: BaseServiceFactory<TcpStream, Config = ()>,
T::Future: 'static,
T::Service: 'static,
T::Error: 'static,
T::InitError: fmt::Debug + 'static,
{
type Response = ();
type Error = ();
type Config = ();
type Service = BoxedServerService;
type InitError = ();
type Future = LocalBoxFuture<'static, Result<BoxedServerService, ()>>;
fn new_service(&self, _: ()) -> Self::Future {
let fut = self.inner.new_service(());
Box::pin(async move {
match fut.await {
Ok(s) => Ok(Box::new(StreamService::new(s)) as BoxedServerService),
Err(e) => {
error!("Can not construct service: {:?}", e);
Err(())
}
}
})
}
}

View File

@@ -6,7 +6,6 @@
mod accept;
mod builder;
mod config;
mod server;
mod service;
mod signals;
@@ -16,7 +15,6 @@ mod waker_queue;
mod worker;
pub use self::builder::ServerBuilder;
pub use self::config::{ServiceConfig, ServiceRuntime};
pub use self::server::Server;
pub use self::service::ServiceFactory;
pub use self::test_server::TestServer;
@@ -28,28 +26,6 @@ use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Socket ID token
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub(crate) struct Token(usize);
impl Default for Token {
fn default() -> Self {
Self::new()
}
}
impl Token {
fn new() -> Self {
Self(0)
}
pub(crate) fn next(&mut self) -> Token {
let token = Token(self.0);
self.0 += 1;
token
}
}
/// Start server building process
pub fn new() -> ServerBuilder {
ServerBuilder::default()

View File

@@ -3,15 +3,12 @@ use std::net::SocketAddr;
use std::task::{Context, Poll};
use actix_service::{Service, ServiceFactory as BaseServiceFactory};
use actix_utils::{
counter::CounterGuard,
future::{ready, Ready},
};
use actix_utils::future::{ready, Ready};
use futures_core::future::LocalBoxFuture;
use log::error;
use crate::socket::{FromStream, MioStream};
use crate::Token;
use crate::worker::WorkerCounterGuard;
pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
type Factory: BaseServiceFactory<Stream, Config = ()>;
@@ -20,16 +17,16 @@ pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
}
pub(crate) trait InternalServiceFactory: Send {
fn name(&self, token: Token) -> &str;
fn name(&self, token: usize) -> &str;
fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>;
fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>;
}
pub(crate) type BoxedServerService = Box<
dyn Service<
(CounterGuard, MioStream),
(WorkerCounterGuard, MioStream),
Response = (),
Error = (),
Future = Ready<Result<(), ()>>,
@@ -50,7 +47,7 @@ impl<S, I> StreamService<S, I> {
}
}
impl<S, I> Service<(CounterGuard, MioStream)> for StreamService<S, I>
impl<S, I> Service<(WorkerCounterGuard, MioStream)> for StreamService<S, I>
where
S: Service<I>,
S::Future: 'static,
@@ -65,7 +62,7 @@ where
self.service.poll_ready(ctx).map_err(|_| ())
}
fn call(&self, (guard, req): (CounterGuard, MioStream)) -> Self::Future {
fn call(&self, (guard, req): (WorkerCounterGuard, MioStream)) -> Self::Future {
ready(match FromStream::from_mio(req) {
Ok(stream) => {
let f = self.service.call(stream);
@@ -86,7 +83,7 @@ where
pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> {
name: String,
inner: F,
token: Token,
token: usize,
addr: SocketAddr,
_t: PhantomData<Io>,
}
@@ -98,7 +95,7 @@ where
{
pub(crate) fn create(
name: String,
token: Token,
token: usize,
inner: F,
addr: SocketAddr,
) -> Box<dyn InternalServiceFactory> {
@@ -117,7 +114,7 @@ where
F: ServiceFactory<Io>,
Io: FromStream + Send + 'static,
{
fn name(&self, _: Token) -> &str {
fn name(&self, _: usize) -> &str {
&self.name
}
@@ -131,14 +128,14 @@ where
})
}
fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> {
let token = self.token;
let fut = self.inner.create().new_service(());
Box::pin(async move {
match fut.await {
Ok(inner) => {
let service = Box::new(StreamService::new(inner)) as _;
Ok(vec![(token, service)])
Ok((token, service))
}
Err(_) => Err(()),
}

View File

@@ -23,9 +23,15 @@ pub(crate) enum MioListener {
impl MioListener {
pub(crate) fn local_addr(&self) -> SocketAddr {
match *self {
MioListener::Tcp(ref lst) => SocketAddr::Tcp(lst.local_addr().unwrap()),
MioListener::Tcp(ref lst) => lst
.local_addr()
.map(SocketAddr::Tcp)
.unwrap_or(SocketAddr::Unknown),
#[cfg(unix)]
MioListener::Uds(ref lst) => SocketAddr::Uds(lst.local_addr().unwrap()),
MioListener::Uds(ref lst) => lst
.local_addr()
.map(SocketAddr::Uds)
.unwrap_or(SocketAddr::Unknown),
}
}
@@ -110,14 +116,15 @@ impl fmt::Debug for MioListener {
impl fmt::Display for MioListener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
MioListener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()),
MioListener::Tcp(ref lst) => write!(f, "{:?}", lst),
#[cfg(unix)]
MioListener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()),
MioListener::Uds(ref lst) => write!(f, "{:?}", lst),
}
}
}
pub(crate) enum SocketAddr {
Unknown,
Tcp(StdSocketAddr),
#[cfg(unix)]
Uds(mio::net::SocketAddr),
@@ -126,9 +133,10 @@ pub(crate) enum SocketAddr {
impl fmt::Display for SocketAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
SocketAddr::Tcp(ref addr) => write!(f, "{}", addr),
Self::Unknown => write!(f, "Unknown SocketAddr"),
Self::Tcp(ref addr) => write!(f, "{}", addr),
#[cfg(unix)]
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
Self::Uds(ref addr) => write!(f, "{:?}", addr),
}
}
}
@@ -136,9 +144,10 @@ impl fmt::Display for SocketAddr {
impl fmt::Debug for SocketAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr),
Self::Unknown => write!(f, "Unknown SocketAddr"),
Self::Tcp(ref addr) => write!(f, "{:?}", addr),
#[cfg(unix)]
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
Self::Uds(ref addr) => write!(f, "{:?}", addr),
}
}
}

View File

@@ -2,8 +2,9 @@ use std::{
future::Future,
mem,
pin::Pin,
rc::Rc,
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
@@ -15,7 +16,6 @@ use actix_rt::{
time::{sleep, Instant, Sleep},
Arbiter,
};
use actix_utils::counter::Counter;
use futures_core::{future::LocalBoxFuture, ready};
use log::{error, info, trace};
use tokio::sync::{
@@ -23,10 +23,10 @@ use tokio::sync::{
oneshot,
};
use crate::join_all;
use crate::service::{BoxedServerService, InternalServiceFactory};
use crate::socket::MioStream;
use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::{join_all, Token};
/// Stop worker message. Returns `true` on successful graceful shutdown.
/// and `false` if some connections still alive when shutdown execute.
@@ -38,35 +38,131 @@ pub(crate) struct Stop {
#[derive(Debug)]
pub(crate) struct Conn {
pub io: MioStream,
pub token: Token,
pub token: usize,
}
fn handle_pair(
idx: usize,
tx1: UnboundedSender<Conn>,
tx2: UnboundedSender<Stop>,
avail: WorkerAvailability,
counter: Counter,
) -> (WorkerHandleAccept, WorkerHandleServer) {
let accept = WorkerHandleAccept { tx: tx1, avail };
let accept = WorkerHandleAccept {
idx,
tx: tx1,
counter,
};
let server = WorkerHandleServer { idx, tx: tx2 };
(accept, server)
}
/// counter: Arc<AtomicUsize> field is owned by `Accept` thread and `ServerWorker` thread.
///
/// `Accept` would increment the counter and `ServerWorker` would decrement it.
///
/// # Atomic Ordering:
///
/// `Accept` always look into it's cached `Availability` field for `ServerWorker` state.
/// It lazily increment counter after successful dispatching new work to `ServerWorker`.
/// On reaching counter limit `Accept` update it's cached `Availability` and mark worker as
/// unable to accept any work.
///
/// `ServerWorker` always decrement the counter when every work received from `Accept` is done.
/// On reaching counter limit worker would use `mio::Waker` and `WakerQueue` to wake up `Accept`
/// and notify it to update cached `Availability` again to mark worker as able to accept work again.
///
/// Hence, a wake up would only happen after `Accept` increment it to limit.
/// And a decrement to limit always wake up `Accept`.
#[derive(Clone)]
pub(crate) struct Counter {
counter: Arc<AtomicUsize>,
limit: usize,
}
impl Counter {
pub(crate) fn new(limit: usize) -> Self {
Self {
counter: Arc::new(AtomicUsize::new(1)),
limit,
}
}
/// Increment counter by 1 and return true when hitting limit
#[inline(always)]
pub(crate) fn inc(&self) -> bool {
self.counter.fetch_add(1, Ordering::Relaxed) != self.limit
}
/// Decrement counter by 1 and return true if crossing limit.
#[inline(always)]
pub(crate) fn dec(&self) -> bool {
self.counter.fetch_sub(1, Ordering::Relaxed) == self.limit
}
pub(crate) fn total(&self) -> usize {
self.counter.load(Ordering::SeqCst) - 1
}
}
pub(crate) struct WorkerCounter {
idx: usize,
inner: Rc<(WakerQueue, Counter)>,
}
impl Clone for WorkerCounter {
fn clone(&self) -> Self {
Self {
idx: self.idx,
inner: self.inner.clone(),
}
}
}
impl WorkerCounter {
pub(crate) fn new(idx: usize, waker_queue: WakerQueue, counter: Counter) -> Self {
Self {
idx,
inner: Rc::new((waker_queue, counter)),
}
}
#[inline(always)]
pub(crate) fn guard(&self) -> WorkerCounterGuard {
WorkerCounterGuard(self.clone())
}
fn total(&self) -> usize {
self.inner.1.total()
}
}
pub(crate) struct WorkerCounterGuard(WorkerCounter);
impl Drop for WorkerCounterGuard {
fn drop(&mut self) {
let (waker_queue, counter) = &*self.0.inner;
if counter.dec() {
waker_queue.wake(WakerInterest::WorkerAvailable(self.0.idx));
}
}
}
/// Handle to worker that can send connection message to worker and share the
/// availability of worker to other thread.
///
/// Held by [Accept](crate::accept::Accept).
pub(crate) struct WorkerHandleAccept {
idx: usize,
tx: UnboundedSender<Conn>,
avail: WorkerAvailability,
counter: Counter,
}
impl WorkerHandleAccept {
#[inline(always)]
pub(crate) fn idx(&self) -> usize {
self.avail.idx
self.idx
}
#[inline(always)]
@@ -75,16 +171,17 @@ impl WorkerHandleAccept {
}
#[inline(always)]
pub(crate) fn available(&self) -> bool {
self.avail.available()
pub(crate) fn inc_counter(&self) -> bool {
self.counter.inc()
}
}
/// Handle to worker than can send stop message to worker.
///
/// Held by [ServerBuilder](crate::builder::ServerBuilder).
#[derive(Debug)]
pub(crate) struct WorkerHandleServer {
pub idx: usize,
idx: usize,
tx: UnboundedSender<Stop>,
}
@@ -96,40 +193,6 @@ impl WorkerHandleServer {
}
}
#[derive(Clone)]
pub(crate) struct WorkerAvailability {
idx: usize,
waker: WakerQueue,
available: Arc<AtomicBool>,
}
impl WorkerAvailability {
pub fn new(idx: usize, waker: WakerQueue) -> Self {
WorkerAvailability {
idx,
waker,
available: Arc::new(AtomicBool::new(false)),
}
}
#[inline(always)]
pub fn available(&self) -> bool {
self.available.load(Ordering::Acquire)
}
pub fn set(&self, val: bool) {
// Ordering:
//
// There could be multiple set calls happen in one <ServerWorker as Future>::poll.
// Order is important between them.
let old = self.available.swap(val, Ordering::AcqRel);
// Notify the accept on switched to available.
if !old && val {
self.waker.wake(WakerInterest::WorkerAvailable(self.idx));
}
}
}
/// Service worker.
///
/// Worker accepts Socket objects via unbounded channel and starts stream processing.
@@ -138,9 +201,8 @@ pub(crate) struct ServerWorker {
// It must be dropped as soon as ServerWorker dropping.
rx: UnboundedReceiver<Conn>,
rx2: UnboundedReceiver<Stop>,
counter: WorkerCounter,
services: Box<[WorkerService]>,
availability: WorkerAvailability,
conns: Counter,
factories: Box<[Box<dyn InternalServiceFactory>]>,
state: WorkerState,
shutdown_timeout: Duration,
@@ -207,15 +269,15 @@ impl ServerWorker {
pub(crate) fn start(
idx: usize,
factories: Vec<Box<dyn InternalServiceFactory>>,
availability: WorkerAvailability,
waker_queue: WakerQueue,
config: ServerWorkerConfig,
) -> (WorkerHandleAccept, WorkerHandleServer) {
assert!(!availability.available());
let (tx1, rx) = unbounded_channel();
let (tx2, rx2) = unbounded_channel();
let avail = availability.clone();
let counter = Counter::new(config.max_concurrent_connections);
let counter_clone = counter.clone();
// every worker runs in it's own arbiter.
// use a custom tokio runtime builder to change the settings of runtime.
Arbiter::with_tokio_rt(move || {
@@ -231,11 +293,7 @@ impl ServerWorker {
.enumerate()
.map(|(idx, factory)| {
let fut = factory.create();
async move {
fut.await.map(|r| {
r.into_iter().map(|(t, s)| (idx, t, s)).collect::<Vec<_>>()
})
}
async move { fut.await.map(|(t, s)| (idx, t, s)) }
})
.collect::<Vec<_>>();
@@ -248,9 +306,8 @@ impl ServerWorker {
let services = match res {
Ok(res) => res
.into_iter()
.flatten()
.fold(Vec::new(), |mut services, (factory, token, service)| {
assert_eq!(token.0, services.len());
assert_eq!(token, services.len());
services.push(WorkerService {
factory,
service,
@@ -271,8 +328,7 @@ impl ServerWorker {
rx,
rx2,
services,
availability,
conns: Counter::new(config.max_concurrent_connections),
counter: WorkerCounter::new(idx, waker_queue, counter_clone),
factories: factories.into_boxed_slice(),
state: Default::default(),
shutdown_timeout: config.shutdown_timeout,
@@ -280,16 +336,16 @@ impl ServerWorker {
});
});
handle_pair(idx, tx1, tx2, avail)
handle_pair(idx, tx1, tx2, counter)
}
fn restart_service(&mut self, token: Token, factory_id: usize) {
fn restart_service(&mut self, idx: usize, factory_id: usize) {
let factory = &self.factories[factory_id];
trace!("Service {:?} failed, restarting", factory.name(token));
self.services[token.0].status = WorkerServiceStatus::Restarting;
trace!("Service {:?} failed, restarting", factory.name(idx));
self.services[idx].status = WorkerServiceStatus::Restarting;
self.state = WorkerState::Restarting(Restart {
factory_id,
token,
token: idx,
fut: factory.create(),
});
}
@@ -307,8 +363,8 @@ impl ServerWorker {
});
}
fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result<bool, (Token, usize)> {
let mut ready = self.conns.available(cx);
fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result<bool, (usize, usize)> {
let mut ready = true;
for (idx, srv) in self.services.iter_mut().enumerate() {
if srv.status == WorkerServiceStatus::Available
|| srv.status == WorkerServiceStatus::Unavailable
@@ -318,7 +374,7 @@ impl ServerWorker {
if srv.status == WorkerServiceStatus::Unavailable {
trace!(
"Service {:?} is available",
self.factories[srv.factory].name(Token(idx))
self.factories[srv.factory].name(idx)
);
srv.status = WorkerServiceStatus::Available;
}
@@ -329,7 +385,7 @@ impl ServerWorker {
if srv.status == WorkerServiceStatus::Available {
trace!(
"Service {:?} is unavailable",
self.factories[srv.factory].name(Token(idx))
self.factories[srv.factory].name(idx)
);
srv.status = WorkerServiceStatus::Unavailable;
}
@@ -337,10 +393,10 @@ impl ServerWorker {
Poll::Ready(Err(_)) => {
error!(
"Service {:?} readiness check returned error, restarting",
self.factories[srv.factory].name(Token(idx))
self.factories[srv.factory].name(idx)
);
srv.status = WorkerServiceStatus::Failed;
return Err((Token(idx), srv.factory));
return Err((idx, srv.factory));
}
}
}
@@ -359,8 +415,8 @@ enum WorkerState {
struct Restart {
factory_id: usize,
token: Token,
fut: LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>,
token: usize,
fut: LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>,
}
// Shutdown keep states necessary for server shutdown:
@@ -381,10 +437,6 @@ impl Default for WorkerState {
impl Drop for ServerWorker {
fn drop(&mut self) {
// Set availability to true so if accept try to send connection to this worker
// it would find worker is gone and remove it.
// This is helpful when worker is dropped unexpected.
self.availability.set(true);
// Stop the Arbiter ServerWorker runs on on drop.
Arbiter::current().stop();
}
@@ -399,8 +451,7 @@ impl Future for ServerWorker {
// `StopWorker` message handler
if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx)
{
this.availability.set(false);
let num = this.conns.total();
let num = this.counter.total();
if num == 0 {
info!("Shutting down worker, 0 connections");
let _ = tx.send(true);
@@ -427,7 +478,6 @@ impl Future for ServerWorker {
WorkerState::Unavailable => match this.check_readiness(cx) {
Ok(true) => {
this.state = WorkerState::Available;
this.availability.set(true);
self.poll(cx)
}
Ok(false) => Poll::Pending,
@@ -440,26 +490,22 @@ impl Future for ServerWorker {
let factory_id = restart.factory_id;
let token = restart.token;
let service = ready!(restart.fut.as_mut().poll(cx))
let (token_new, service) = ready!(restart.fut.as_mut().poll(cx))
.unwrap_or_else(|_| {
panic!(
"Can not restart {:?} service",
this.factories[factory_id].name(token)
)
})
.into_iter()
// Find the same token from vector. There should be only one
// So the first match would be enough.
.find(|(t, _)| *t == token)
.map(|(_, service)| service)
.expect("No BoxedServerService found");
});
assert_eq!(token, token_new);
trace!(
"Service {:?} has been restarted",
this.factories[factory_id].name(token)
);
this.services[token.0].created(service);
this.services[token].created(service);
this.state = WorkerState::Unavailable;
self.poll(cx)
@@ -468,7 +514,7 @@ impl Future for ServerWorker {
// Wait for 1 second.
ready!(shutdown.timer.as_mut().poll(cx));
if this.conns.total() == 0 {
if this.counter.total() == 0 {
// Graceful shutdown.
if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) {
let _ = shutdown.tx.send(true);
@@ -493,22 +539,20 @@ impl Future for ServerWorker {
Ok(true) => {}
Ok(false) => {
trace!("Worker is unavailable");
this.availability.set(false);
this.state = WorkerState::Unavailable;
return self.poll(cx);
}
Err((token, idx)) => {
this.restart_service(token, idx);
this.availability.set(false);
return self.poll(cx);
}
}
// handle incoming io stream
match ready!(Pin::new(&mut this.rx).poll_recv(cx)) {
// handle incoming io stream
Some(msg) => {
let guard = this.conns.get();
let _ = this.services[msg.token.0].service.call((guard, msg.io));
let guard = this.counter.guard();
let _ = this.services[msg.token].service.call((guard, msg.io));
}
None => return Poll::Ready(()),
};

View File

@@ -142,57 +142,6 @@ fn test_start() {
let _ = h.join();
}
#[test]
fn test_configure() {
let addr1 = unused_addr();
let addr2 = unused_addr();
let addr3 = unused_addr();
let (tx, rx) = mpsc::channel();
let num = Arc::new(AtomicUsize::new(0));
let num2 = num.clone();
let h = thread::spawn(move || {
let num = num2.clone();
let sys = actix_rt::System::new();
let srv = sys.block_on(lazy(|_| {
Server::build()
.disable_signals()
.configure(move |cfg| {
let num = num.clone();
let lst = net::TcpListener::bind(addr3).unwrap();
cfg.bind("addr1", addr1)
.unwrap()
.bind("addr2", addr2)
.unwrap()
.listen("addr3", lst)
.apply(move |rt| {
let num = num.clone();
rt.service("addr1", fn_service(|_| ok::<_, ()>(())));
rt.service("addr3", fn_service(|_| ok::<_, ()>(())));
rt.on_start(lazy(move |_| {
let _ = num.fetch_add(1, Ordering::Relaxed);
}))
})
})
.unwrap()
.workers(1)
.run()
}));
let _ = tx.send((srv, actix_rt::System::current()));
let _ = sys.run();
});
let (_, sys) = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr1).is_ok());
assert!(net::TcpStream::connect(addr2).is_ok());
assert!(net::TcpStream::connect(addr3).is_ok());
assert_eq!(num.load(Ordering::Relaxed), 1);
sys.stop();
let _ = h.join();
}
#[actix_rt::test]
async fn test_max_concurrent_connections() {
// Note:
@@ -305,81 +254,6 @@ async fn test_service_restart() {
let num_clone = num.clone();
let num2_clone = num2.clone();
let h = thread::spawn(move || {
actix_rt::System::new().block_on(async {
let server = Server::build()
.backlog(1)
.disable_signals()
.configure(move |cfg| {
let num = num.clone();
let num2 = num2.clone();
cfg.bind("addr1", addr1)
.unwrap()
.bind("addr2", addr2)
.unwrap()
.apply(move |rt| {
let num = num.clone();
let num2 = num2.clone();
rt.service(
"addr1",
fn_factory(move || {
let num = num.clone();
async move { Ok::<_, ()>(TestService(num)) }
}),
);
rt.service(
"addr2",
fn_factory(move || {
let num2 = num2.clone();
async move { Ok::<_, ()>(TestService(num2)) }
}),
);
})
})
.unwrap()
.workers(1)
.run();
let _ = tx.send((server.clone(), actix_rt::System::current()));
server.await
})
});
let (server, sys) = rx.recv().unwrap();
for _ in 0..5 {
TcpStream::connect(addr1)
.await
.unwrap()
.shutdown()
.await
.unwrap();
TcpStream::connect(addr2)
.await
.unwrap()
.shutdown()
.await
.unwrap();
}
sleep(Duration::from_secs(3)).await;
assert!(num_clone.load(Ordering::SeqCst) > 5);
assert!(num2_clone.load(Ordering::SeqCst) > 5);
sys.stop();
let _ = server.stop(false);
let _ = h.join().unwrap();
let addr1 = unused_addr();
let addr2 = unused_addr();
let (tx, rx) = mpsc::channel();
let num = Arc::new(AtomicUsize::new(0));
let num2 = Arc::new(AtomicUsize::new(0));
let num_clone = num.clone();
let num2_clone = num2.clone();
let h = thread::spawn(move || {
let num = num.clone();
actix_rt::System::new().block_on(async {

View File

@@ -1,5 +1,9 @@
/// An implementation of [`poll_ready`]() that always signals readiness.
///
/// This should only be used for basic leaf services that have no concept of un-readiness.
/// For wrapper or other serivice types, use [`forward_ready!`] for simple cases or write a bespoke
/// `poll_ready` implementation.
///
/// [`poll_ready`]: crate::Service::poll_ready
///
/// # Examples

View File

@@ -41,7 +41,7 @@ where
///
/// actix_service::forward_ready!(service);
///
/// fn call(&self, req: S::Request) -> Self::Future {
/// fn call(&self, req: Req) -> Self::Future {
/// TimeoutServiceResponse {
/// fut: self.service.call(req),
/// sleep: Sleep::new(clock::now() + self.timeout),

View File

@@ -64,7 +64,7 @@ tokio-native-tls = { version = "0.3", optional = true }
[dev-dependencies]
actix-rt = "2.2.0"
actix-server = "2.0.0-beta.3"
actix-server = "2.0.0-beta.5"
bytes = "1"
env_logger = "0.8"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }