1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-08-12 14:27:05 +02:00

Compare commits

..

60 Commits

Author SHA1 Message Date
Nikolay Kim
94e673b50b Add arbiter specific storage 2019-09-02 15:03:03 -07:00
Nikolay Kim
1a644c6bb1 Check service readiness for new_apply_cfg combinator 2019-08-27 05:28:15 +06:00
Yuki Okushi
aad013f559 Fix clippy warnings (#40)
Add explicit `dyn`s

Remove let binding

Use +=

Return false

Derive Default for TcpConnector

Squash if/else

Remove unnecessary return keywords

Use is_empty()

Fix clippy attribute

Allow mut_from_ref
2019-08-17 05:15:51 +09:00
Aron Heinecke
7a18d9da26 Add check for minimum rust version (#39) 2019-08-07 17:49:29 -07:00
Aron Heinecke
d59b8ce62e Fix invalid minimum version (#38) 2019-08-07 17:48:31 -07:00
Nikolay Kim
3821d511d0 prep actix-threadpool release 2019-08-05 09:54:49 -07:00
Nikolay Kim
62e429cb0c Merge branch 'master' of github.com:actix/actix-net 2019-08-05 09:53:03 -07:00
Nikolay Kim
a2643d475a Add ConnectService and OpensslConnectService 2019-08-05 09:52:50 -07:00
Sven-Hendrik Haase
34c259a8b5 Merge pull request #35 from ignatenkobrain/parking_lot
threadpool: Update parking_lot to 0.9
2019-08-05 17:30:19 +02:00
Igor Gnatenko
8b398c3386 threadpool: Update parking_lot to 0.9
Signed-off-by: Igor Gnatenko <i.gnatenko.brain@gmail.com>
2019-08-04 15:46:14 +02:00
Neil Locketz
0baceb0e56 Fix typo in desc (#34) 2019-07-30 09:35:57 -07:00
Michael Snoyman
6be1f37f6c Minor typo corrections in docs (#33) 2019-07-25 11:46:11 +06:00
Nikolay Kim
a742768feb bump version 2019-07-24 14:16:25 +06:00
Marat Safin
f913872159 add rustls support for connect (#31) 2019-07-24 14:14:26 +06:00
Nikolay Kim
41145040e1 remove ClonableService 2019-07-19 11:03:16 +06:00
Nikolay Kim
311bb14d97 add unix domain sockets support #3 2019-07-18 17:05:40 +06:00
Nikolay Kim
2955e49d78 add unix domain sockets support 2019-07-18 16:43:42 +06:00
Nikolay Kim
9d1b428b34 undeprecate framed transport 2019-07-17 13:31:00 +06:00
Nikolay Kim
42d526bced mark some fn as unsafe 2019-07-17 11:16:38 +06:00
Nikolay Kim
23a230a83b deprecate ClonableService and FramedTransport 2019-07-17 10:57:52 +06:00
Nikolay Kim
411e31786f update actix-connect changes 2019-07-17 10:33:47 +06:00
Nikolay Kim
b491d373b1 update actix-rt changes 2019-07-17 10:30:59 +06:00
Jeff Muizelaar
9271b95c87 Avoid a copy of the Future when initializing the Box. (#29)
Future's can be pretty big (> 1500 bytes) so this probably worth doing.

I confirmed with memcpy-find that this did infact eliminate two ~1500
byte copies from the actix-web basic example.
2019-07-17 10:29:22 +06:00
Jan Michael Auer
1b3cd0d88c Expose Connect addrs (#30) 2019-07-17 06:17:51 +06:00
Nikolay Kim
da302d4b7a fix disconnect callback 2019-07-03 13:02:03 +06:00
Nikolay Kim
922a919572 simple callback 2019-07-02 12:35:27 +06:00
Nikolay Kim
5a62175b6e add disconnect callback 2019-07-02 12:10:05 +06:00
Nikolay Kim
5445e341c3 give access to io object during connect stage 2019-07-01 22:37:59 +06:00
Nikolay Kim
1b17d274a0 refactor connect stage 2019-07-01 11:20:24 +06:00
Nikolay Kim
9d8b3e6275 impl Stream and Sink for Connect 2019-06-30 22:58:23 +06:00
Nikolay Kim
27baf03f64 Do not block on sink drop for FramedTransport 2019-06-26 15:20:56 +06:00
Nikolay Kim
205cac82ce add custom framed dispatcher service 2019-06-26 15:19:40 +06:00
Nikolay Kim
07708c5e9a prepare rt release 2019-06-22 09:02:17 +06:00
Nikolay Kim
1c04ad3238 Merge pull request #22 from GeorgeHahn/with-external-runtime
Allow Actix to be started on an external CurrentThread runtime
2019-06-22 08:53:19 +06:00
Nikolay Kim
66aa21740c Merge pull request #28 from ignatenkobrain/deps
chore: Update derive_more to 0.15
2019-06-18 22:26:37 +06:00
Igor Gnatenko
b183cb3324 chore: Update derive_more to 0.15
Signed-off-by: Igor Gnatenko <i.gnatenko.brain@gmail.com>
2019-06-18 10:25:01 +02:00
Nikolay Kim
158482cd2f Add new_apply_cfg function 2019-06-06 14:28:07 +06:00
George Hahn
9e61f62871 new_async -> run_in_executor and return future directly + builder cleanup 2019-06-05 12:51:59 -05:00
Nikolay Kim
7051888289 prepare actix-threadpool release 2019-06-05 08:09:46 +06:00
Nikolay Kim
0caa47fc47 Merge pull request #27 from ignatenkobrain/license
Include license files into all sub-crates
2019-06-01 16:48:42 +06:00
Nikolay Kim
6d1cbb2d2f Merge pull request #26 from ignatenkobrain/master
chore: Update parking_lot to 0.8
2019-06-01 16:47:56 +06:00
Igor Gnatenko
ca289ddf7f Include license files into all sub-crates
Signed-off-by: Igor Gnatenko <i.gnatenko.brain@gmail.com>
2019-05-30 20:38:44 +02:00
Igor Gnatenko
ad9a197916 chore: Update parking_lot to 0.8
Signed-off-by: Igor Gnatenko <i.gnatenko.brain@gmail.com>
2019-05-30 20:30:43 +02:00
George Hahn
c4f05e033f fixup: fix new_async doc comment 2019-05-24 10:29:52 -05:00
George Hahn
048314913c Enable System to be executed on an external CurrentThread runtime 2019-05-23 13:34:47 -05:00
Nikolay Kim
c1b183e1ce Merge branch 'master' of github.com:actix/actix-net 2019-05-18 10:56:51 -07:00
Nikolay Kim
87bc3dacd9 use u64 for shutdown_timeout 2019-05-18 10:56:41 -07:00
Nikolay Kim
0156f479a0 Merge pull request #19 from pka/patch-1
Fix typo
2019-05-15 13:12:26 -07:00
Pirmin Kalberer
139fa3b9a2 Fix typo 2019-05-15 20:51:24 +02:00
Nikolay Kim
a14f612382 remove debug prints 2019-05-15 10:29:10 -07:00
Nikolay Kim
059e2ad042 Fix checked resource match 2019-05-15 10:21:29 -07:00
Nikolay Kim
fdf2a6f422 prepare actix-utils release 2019-05-15 08:31:40 -07:00
Nikolay Kim
fc2631c852 merge remote 2019-05-14 17:37:14 -07:00
Nikolay Kim
d51b210ae7 Merge branch 'master' of github.com:actix/actix-net 2019-05-14 17:36:18 -07:00
Nikolay Kim
0a6cded975 change Either constructor 2019-05-14 17:32:50 -07:00
Nikolay Kim
14e3933d8b Merge pull request #17 from neoeinstein/tower-interop
Second iteration on tower interop
2019-05-12 20:03:42 -07:00
Nikolay Kim
837504c10f update deps 2019-05-12 08:40:42 -07:00
Marcus Griep
c7676df697 Add documentation and doctests 2019-05-03 10:08:49 -04:00
Marcus Griep
ecf7a11a20 Add convenience methods to wrap with middleware 2019-05-02 17:47:37 -04:00
Marcus Griep
686958fe0c Add reciprical compatibility layer for tower 2019-05-02 17:22:22 -04:00
95 changed files with 3189 additions and 375 deletions

View File

@@ -10,9 +10,10 @@ matrix:
include:
- rust: stable
- rust: beta
- rust: nightly-2019-03-02
- rust: 1.36.0
- rust: nightly-2019-06-15
allow_failures:
- rust: nightly-2019-03-02
- rust: nightly-2019-06-15
env:
global:
@@ -25,8 +26,8 @@ before_install:
- sudo apt-get install -y openssl libssl-dev libelf-dev libdw-dev cmake gcc binutils-dev libiberty-dev
before_cache: |
if [[ "$TRAVIS_RUST_VERSION" == "nightly-2019-03-02" ]]; then
RUSTFLAGS="--cfg procmacro2_semver_exempt" cargo install cargo-tarpaulin
if [[ "$TRAVIS_RUST_VERSION" == "nightly-2019-06-15" ]]; then
RUSTFLAGS="--cfg procmacro2_semver_exempt" cargo install --version 0.6.11 cargo-tarpaulin
fi
# Add clippy
@@ -35,14 +36,14 @@ before_script:
script:
- |
if [[ "$TRAVIS_RUST_VERSION" != "nightly-2019-03-02" ]]; then
if [[ "$TRAVIS_RUST_VERSION" != "nightly-2019-06-15" ]]; then
cargo clean
cargo test --all --all-features -- --nocapture
fi
after_success:
- |
if [[ "$TRAVIS_RUST_VERSION" == "nightly-2019-03-02" ]]; then
if [[ "$TRAVIS_RUST_VERSION" == "nightly-2019-06-15" ]]; then
taskset -c 0 cargo tarpaulin --all --all-features --out Xml
echo "Uploaded code coverage"
bash <(curl -s https://codecov.io/bash)

View File

@@ -2,7 +2,7 @@
name = "actix-net"
version = "0.3.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix net - framework for the compisible network services for Rust"
description = "Actix net - framework for the composable network services for Rust"
readme = "README.md"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs"
@@ -24,15 +24,16 @@ members = [
"actix-test-server",
"actix-threadpool",
"actix-tower",
"actix-ioframe",
"actix-utils",
"router",
]
[dev-dependencies]
actix-service = "0.3.3"
actix-service = "0.4.0"
actix-codec = "0.1.1"
actix-rt = "0.2.0"
actix-server = { path="actix-server", features=["ssl"] }
actix-server = { version="0.5.0", features=["ssl"] }
env_logger = "0.6"
futures = "0.1.25"
openssl = "0.10"

View File

@@ -7,7 +7,7 @@ Actix net - framework for composable network services
* [API Documentation (Development)](https://actix.rs/actix-net/actix_net/)
* [Chat on gitter](https://gitter.im/actix/actix)
* Cargo package: [actix-net](https://crates.io/crates/actix-net)
* Minimum supported Rust version: 1.32 or later
* Minimum supported Rust version: 1.36 or later
## Example

1
actix-codec/LICENSE-APACHE Symbolic link
View File

@@ -0,0 +1 @@
../LICENSE-APACHE

1
actix-codec/LICENSE-MIT Symbolic link
View File

@@ -0,0 +1 @@
../LICENSE-MIT

View File

@@ -1,5 +1,24 @@
# Changes
## [0.2.3] - 2019-08-05
* Add `ConnectService` and `OpensslConnectService`
## [0.2.2] - 2019-07-24
* Add `rustls` support
## [0.2.1] - 2019-07-17
### Added
* Expose Connect addrs #30
### Changed
* Update `derive_more` to 0.15
## [0.2.0] - 2019-05-12
### Changed

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-connect"
version = "0.2.0"
version = "0.2.3"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix Connector - tcp connector service"
keywords = ["network", "framework", "async", "futures"]
@@ -26,6 +26,9 @@ default = ["uri"]
# openssl
ssl = ["openssl", "tokio-openssl"]
#rustls
rust-tls = ["rustls", "tokio-rustls", "webpki"]
# support http::Uri as connect address
uri = ["http"]
@@ -33,7 +36,7 @@ uri = ["http"]
actix-service = "0.4.0"
actix-codec = "0.1.2"
actix-utils = "0.4.0"
derive_more = "0.14.0"
derive_more = "0.15"
either = "1.5.2"
futures = "0.1.25"
http = { version = "0.1.17", optional = true }
@@ -46,6 +49,11 @@ trust-dns-resolver = { version="0.11.0", default-features = false }
openssl = { version="0.10", optional = true }
tokio-openssl = { version="0.3", optional = true }
#rustls
rustls = { version = "0.15.2", optional = true }
tokio-rustls = { version = "0.9.1", optional = true }
webpki = { version = "0.19", optional = true }
[dev-dependencies]
bytes = "0.4"
actix-test-server = { version="0.2.2", features=["ssl"] }

View File

@@ -0,0 +1 @@
../LICENSE-APACHE

1
actix-connect/LICENSE-MIT Symbolic link
View File

@@ -0,0 +1 @@
../LICENSE-MIT

View File

@@ -1,5 +1,6 @@
use std::collections::VecDeque;
use std::collections::{vec_deque, VecDeque};
use std::fmt;
use std::iter::{FromIterator, FusedIterator};
use std::net::SocketAddr;
use either::Either;
@@ -77,6 +78,20 @@ impl<T: Address> Connect<T> {
self
}
/// Use addresses.
pub fn set_addrs<I>(mut self, addrs: I) -> Self
where
I: IntoIterator<Item = SocketAddr>,
{
let mut addrs = VecDeque::from_iter(addrs);
self.addr = if addrs.len() < 2 {
addrs.pop_front().map(Either::Left)
} else {
Some(Either::Right(addrs))
};
self
}
/// Host name
pub fn host(&self) -> &str {
self.req.host()
@@ -86,6 +101,28 @@ impl<T: Address> Connect<T> {
pub fn port(&self) -> u16 {
self.req.port().unwrap_or(self.port)
}
/// Preresolved addresses of the request.
pub fn addrs(&self) -> ConnectAddrsIter<'_> {
let inner = match self.addr {
None => Either::Left(None),
Some(Either::Left(addr)) => Either::Left(Some(addr)),
Some(Either::Right(ref addrs)) => Either::Right(addrs.iter()),
};
ConnectAddrsIter { inner }
}
/// Takes preresolved addresses of the request.
pub fn take_addrs(&mut self) -> ConnectTakeAddrsIter {
let inner = match self.addr.take() {
None => Either::Left(None),
Some(Either::Left(addr)) => Either::Left(Some(addr)),
Some(Either::Right(addrs)) => Either::Right(addrs.into_iter()),
};
ConnectTakeAddrsIter { inner }
}
}
impl<T: Address> From<T> for Connect<T> {
@@ -100,6 +137,70 @@ impl<T: Address> fmt::Display for Connect<T> {
}
}
/// Iterator over addresses in a [`Connect`](struct.Connect.html) request.
#[derive(Clone)]
pub struct ConnectAddrsIter<'a> {
inner: Either<Option<SocketAddr>, vec_deque::Iter<'a, SocketAddr>>,
}
impl Iterator for ConnectAddrsIter<'_> {
type Item = SocketAddr;
fn next(&mut self) -> Option<Self::Item> {
match self.inner {
Either::Left(ref mut opt) => opt.take(),
Either::Right(ref mut iter) => iter.next().copied(),
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
match self.inner {
Either::Left(Some(_)) => (1, Some(1)),
Either::Left(None) => (0, Some(0)),
Either::Right(ref iter) => iter.size_hint(),
}
}
}
impl fmt::Debug for ConnectAddrsIter<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_list().entries(self.clone()).finish()
}
}
impl ExactSizeIterator for ConnectAddrsIter<'_> {}
impl FusedIterator for ConnectAddrsIter<'_> {}
/// Owned iterator over addresses in a [`Connect`](struct.Connect.html) request.
#[derive(Debug)]
pub struct ConnectTakeAddrsIter {
inner: Either<Option<SocketAddr>, vec_deque::IntoIter<SocketAddr>>,
}
impl Iterator for ConnectTakeAddrsIter {
type Item = SocketAddr;
fn next(&mut self) -> Option<Self::Item> {
match self.inner {
Either::Left(ref mut opt) => opt.take(),
Either::Right(ref mut iter) => iter.next(),
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
match self.inner {
Either::Left(Some(_)) => (1, Some(1)),
Either::Left(None) => (0, Some(0)),
Either::Right(ref iter) => iter.size_hint(),
}
}
}
impl ExactSizeIterator for ConnectTakeAddrsIter {}
impl FusedIterator for ConnectTakeAddrsIter {}
fn parse(host: &str) -> (&str, Option<u16>) {
let mut parts_iter = host.splitn(2, ':');
if let Some(host) = parts_iter.next() {

View File

@@ -18,6 +18,17 @@ impl<T> TcpConnectorFactory<T> {
pub fn new() -> Self {
TcpConnectorFactory(PhantomData)
}
/// Create tcp connector service
pub fn service(&self) -> TcpConnector<T> {
TcpConnector(PhantomData)
}
}
impl<T> Default for TcpConnectorFactory<T> {
fn default() -> Self {
TcpConnectorFactory(PhantomData)
}
}
impl<T> Clone for TcpConnectorFactory<T> {
@@ -36,12 +47,12 @@ impl<T: Address> NewService for TcpConnectorFactory<T> {
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future {
ok(TcpConnector(PhantomData))
ok(self.service())
}
}
/// Tcp connector service
#[derive(Debug)]
#[derive(Default, Debug)]
pub struct TcpConnector<T>(PhantomData<T>);
impl<T> TcpConnector<T> {

View File

@@ -16,6 +16,7 @@ mod connect;
mod connector;
mod error;
mod resolver;
mod service;
pub mod ssl;
#[cfg(feature = "uri")]
@@ -29,6 +30,7 @@ pub use self::connect::{Address, Connect, Connection};
pub use self::connector::{TcpConnector, TcpConnectorFactory};
pub use self::error::ConnectError;
pub use self::resolver::{Resolver, ResolverFactory};
pub use self::service::{ConnectService, ConnectServiceFactory};
use actix_service::{NewService, Service, ServiceExt};
use tokio_tcp::TcpStream;

View File

@@ -1,4 +1,3 @@
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::net::SocketAddr;
@@ -26,6 +25,13 @@ impl<T> ResolverFactory<T> {
_t: PhantomData,
}
}
pub fn service(&self) -> Resolver<T> {
Resolver {
resolver: self.resolver.clone(),
_t: PhantomData,
}
}
}
impl<T> Default for ResolverFactory<T> {
@@ -56,10 +62,7 @@ impl<T: Address> NewService for ResolverFactory<T> {
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future {
ok(Resolver {
resolver: self.resolver.clone(),
_t: PhantomData,
})
ok(self.service())
}
}
@@ -110,17 +113,15 @@ impl<T: Address> Service for Resolver<T> {
fn call(&mut self, mut req: Connect<T>) -> Self::Future {
if req.addr.is_some() {
Either::B(ok(req))
} else if let Ok(ip) = req.host().parse() {
req.addr = Some(either::Either::Left(SocketAddr::new(ip, req.port())));
Either::B(ok(req))
} else {
if let Ok(ip) = req.host().parse() {
req.addr = Some(either::Either::Left(SocketAddr::new(ip, req.port())));
Either::B(ok(req))
} else {
trace!("DNS resolver: resolving host {:?}", req.host());
if self.resolver.is_none() {
self.resolver = Some(get_default_resolver());
}
Either::A(ResolverFuture::new(req, self.resolver.as_ref().unwrap()))
trace!("DNS resolver: resolving host {:?}", req.host());
if self.resolver.is_none() {
self.resolver = Some(get_default_resolver());
}
Either::A(ResolverFuture::new(req, self.resolver.as_ref().unwrap()))
}
}
}
@@ -162,23 +163,20 @@ impl<T: Address> Future for ResolverFuture<T> {
})? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready(ips) => {
let mut req = self.req.take().unwrap();
let mut addrs: VecDeque<_> = ips
.iter()
.map(|ip| SocketAddr::new(ip, req.port()))
.collect();
let req = self.req.take().unwrap();
let port = req.port();
let req = req.set_addrs(ips.iter().map(|ip| SocketAddr::new(ip, port)));
trace!(
"DNS resolver: host {:?} resolved to {:?}",
req.host(),
addrs
req.addrs()
);
if addrs.is_empty() {
if req.addr.is_none() {
Err(ConnectError::NoRecords)
} else if addrs.len() == 1 {
req.addr = Some(either::Either::Left(addrs.pop_front().unwrap()));
Ok(Async::Ready(req))
} else {
req.addr = Some(either::Either::Right(addrs));
Ok(Async::Ready(req))
}
}

View File

@@ -0,0 +1,123 @@
use actix_service::{NewService, Service};
use futures::future::{ok, FutureResult};
use futures::{try_ready, Async, Future, Poll};
use tokio_tcp::TcpStream;
use trust_dns_resolver::AsyncResolver;
use crate::connect::{Address, Connect, Connection};
use crate::connector::{TcpConnector, TcpConnectorFactory};
use crate::error::ConnectError;
use crate::resolver::{Resolver, ResolverFactory};
pub struct ConnectServiceFactory<T> {
tcp: TcpConnectorFactory<T>,
resolver: ResolverFactory<T>,
}
impl<T> ConnectServiceFactory<T> {
/// Construct new ConnectService factory
pub fn new() -> Self {
ConnectServiceFactory {
tcp: TcpConnectorFactory::default(),
resolver: ResolverFactory::default(),
}
}
/// Construct new connect service with custom dns resolver
pub fn with_resolver(resolver: AsyncResolver) -> Self {
ConnectServiceFactory {
tcp: TcpConnectorFactory::default(),
resolver: ResolverFactory::new(resolver),
}
}
/// Construct new service
pub fn service(&self) -> ConnectService<T> {
ConnectService {
tcp: self.tcp.service(),
resolver: self.resolver.service(),
}
}
}
impl<T> Default for ConnectServiceFactory<T> {
fn default() -> Self {
ConnectServiceFactory {
tcp: TcpConnectorFactory::default(),
resolver: ResolverFactory::default(),
}
}
}
impl<T> Clone for ConnectServiceFactory<T> {
fn clone(&self) -> Self {
ConnectServiceFactory {
tcp: self.tcp.clone(),
resolver: self.resolver.clone(),
}
}
}
impl<T: Address> NewService for ConnectServiceFactory<T> {
type Request = Connect<T>;
type Response = Connection<T, TcpStream>;
type Error = ConnectError;
type Config = ();
type Service = ConnectService<T>;
type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future {
ok(self.service())
}
}
#[derive(Clone)]
pub struct ConnectService<T> {
tcp: TcpConnector<T>,
resolver: Resolver<T>,
}
impl<T: Address> Service for ConnectService<T> {
type Request = Connect<T>;
type Response = Connection<T, TcpStream>;
type Error = ConnectError;
type Future = ConnectServiceResponse<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: Connect<T>) -> Self::Future {
ConnectServiceResponse {
fut1: Some(self.resolver.call(req)),
fut2: None,
tcp: self.tcp.clone(),
}
}
}
pub struct ConnectServiceResponse<T: Address> {
fut1: Option<<Resolver<T> as Service>::Future>,
fut2: Option<<TcpConnector<T> as Service>::Future>,
tcp: TcpConnector<T>,
}
impl<T: Address> Future for ConnectServiceResponse<T> {
type Item = Connection<T, TcpStream>;
type Error = ConnectError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.fut1 {
let res = try_ready!(fut.poll());
let _ = self.fut1.take();
self.fut2 = Some(self.tcp.call(res));
}
if let Some(ref mut fut) = self.fut2 {
return fut.poll();
}
Ok(Async::NotReady)
}
}

View File

@@ -3,4 +3,10 @@
#[cfg(feature = "ssl")]
mod openssl;
#[cfg(feature = "ssl")]
pub use self::openssl::OpensslConnector;
pub use self::openssl::{
OpensslConnectService, OpensslConnectServiceFactory, OpensslConnector,
};
#[cfg(feature = "rust-tls")]
mod rustls;
#[cfg(feature = "rust-tls")]
pub use self::rustls::RustlsConnector;

View File

@@ -1,13 +1,17 @@
use std::fmt;
use std::marker::PhantomData;
use std::{fmt, io};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::{NewService, Service};
use futures::{future::ok, future::FutureResult, Async, Future, Poll};
use futures::{future::ok, future::FutureResult, try_ready, Async, Future, Poll};
use openssl::ssl::{HandshakeError, SslConnector};
use tokio_openssl::{ConnectAsync, SslConnectorExt, SslStream};
use tokio_tcp::TcpStream;
use trust_dns_resolver::AsyncResolver;
use crate::{Address, Connection};
use crate::{
Address, Connect, ConnectError, ConnectService, ConnectServiceFactory, Connection,
};
/// Openssl connector factory
pub struct OpensslConnector<T, U> {
@@ -77,6 +81,15 @@ pub struct OpensslConnectorService<T, U> {
_t: PhantomData<(T, U)>,
}
impl<T, U> Clone for OpensslConnectorService<T, U> {
fn clone(&self) -> Self {
Self {
connector: self.connector.clone(),
_t: PhantomData,
}
}
}
impl<T: Address, U> Service for OpensslConnectorService<T, U>
where
U: AsyncRead + AsyncWrite + fmt::Debug,
@@ -126,3 +139,113 @@ where
}
}
}
pub struct OpensslConnectServiceFactory<T> {
tcp: ConnectServiceFactory<T>,
openssl: OpensslConnector<T, TcpStream>,
}
impl<T> OpensslConnectServiceFactory<T> {
/// Construct new OpensslConnectService factory
pub fn new(connector: SslConnector) -> Self {
OpensslConnectServiceFactory {
tcp: ConnectServiceFactory::default(),
openssl: OpensslConnector::new(connector),
}
}
/// Construct new connect service with custom dns resolver
pub fn with_resolver(connector: SslConnector, resolver: AsyncResolver) -> Self {
OpensslConnectServiceFactory {
tcp: ConnectServiceFactory::with_resolver(resolver),
openssl: OpensslConnector::new(connector),
}
}
/// Construct openssl connect service
pub fn service(&self) -> OpensslConnectService<T> {
OpensslConnectService {
tcp: self.tcp.service(),
openssl: OpensslConnectorService {
connector: self.openssl.connector.clone(),
_t: PhantomData,
},
}
}
}
impl<T> Clone for OpensslConnectServiceFactory<T> {
fn clone(&self) -> Self {
OpensslConnectServiceFactory {
tcp: self.tcp.clone(),
openssl: self.openssl.clone(),
}
}
}
impl<T: Address> NewService for OpensslConnectServiceFactory<T> {
type Request = Connect<T>;
type Response = SslStream<TcpStream>;
type Error = ConnectError;
type Config = ();
type Service = OpensslConnectService<T>;
type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future {
ok(self.service())
}
}
#[derive(Clone)]
pub struct OpensslConnectService<T> {
tcp: ConnectService<T>,
openssl: OpensslConnectorService<T, TcpStream>,
}
impl<T: Address> Service for OpensslConnectService<T> {
type Request = Connect<T>;
type Response = SslStream<TcpStream>;
type Error = ConnectError;
type Future = OpensslConnectServiceResponse<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: Connect<T>) -> Self::Future {
OpensslConnectServiceResponse {
fut1: Some(self.tcp.call(req)),
fut2: None,
openssl: self.openssl.clone(),
}
}
}
pub struct OpensslConnectServiceResponse<T: Address> {
fut1: Option<<ConnectService<T> as Service>::Future>,
fut2: Option<<OpensslConnectorService<T, TcpStream> as Service>::Future>,
openssl: OpensslConnectorService<T, TcpStream>,
}
impl<T: Address> Future for OpensslConnectServiceResponse<T> {
type Item = SslStream<TcpStream>;
type Error = ConnectError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.fut1 {
let res = try_ready!(fut.poll());
let _ = self.fut1.take();
self.fut2 = Some(self.openssl.call(res));
}
if let Some(ref mut fut) = self.fut2 {
let connect = try_ready!(fut
.poll()
.map_err(|e| ConnectError::Io(io::Error::new(io::ErrorKind::Other, e))));
Ok(Async::Ready(connect.into_parts().0))
} else {
Ok(Async::NotReady)
}
}
}

View File

@@ -0,0 +1,133 @@
use std::fmt;
use std::marker::PhantomData;
use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::{NewService, Service};
use futures::{future::ok, future::FutureResult, Async, Future, Poll};
use std::sync::Arc;
use tokio_rustls::{
rustls::{ClientConfig, ClientSession},
Connect, TlsConnector, TlsStream,
};
use webpki::DNSNameRef;
use crate::{Address, Connection};
/// Rustls connector factory
pub struct RustlsConnector<T, U> {
connector: Arc<ClientConfig>,
_t: PhantomData<(T, U)>,
}
impl<T, U> RustlsConnector<T, U> {
pub fn new(connector: Arc<ClientConfig>) -> Self {
RustlsConnector {
connector,
_t: PhantomData,
}
}
}
impl<T, U> RustlsConnector<T, U>
where
T: Address,
U: AsyncRead + AsyncWrite + fmt::Debug,
{
pub fn service(
connector: Arc<ClientConfig>,
) -> impl Service<
Request = Connection<T, U>,
Response = Connection<T, TlsStream<U, ClientSession>>,
Error = std::io::Error,
> {
RustlsConnectorService {
connector: connector,
_t: PhantomData,
}
}
}
impl<T, U> Clone for RustlsConnector<T, U> {
fn clone(&self) -> Self {
Self {
connector: self.connector.clone(),
_t: PhantomData,
}
}
}
impl<T: Address, U> NewService for RustlsConnector<T, U>
where
U: AsyncRead + AsyncWrite + fmt::Debug,
{
type Request = Connection<T, U>;
type Response = Connection<T, TlsStream<U, ClientSession>>;
type Error = std::io::Error;
type Config = ();
type Service = RustlsConnectorService<T, U>;
type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future {
ok(RustlsConnectorService {
connector: self.connector.clone(),
_t: PhantomData,
})
}
}
pub struct RustlsConnectorService<T, U> {
connector: Arc<ClientConfig>,
_t: PhantomData<(T, U)>,
}
impl<T: Address, U> Service for RustlsConnectorService<T, U>
where
U: AsyncRead + AsyncWrite + fmt::Debug,
{
type Request = Connection<T, U>;
type Response = Connection<T, TlsStream<U, ClientSession>>;
type Error = std::io::Error;
type Future = ConnectAsyncExt<T, U>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, stream: Connection<T, U>) -> Self::Future {
trace!("SSL Handshake start for: {:?}", stream.host());
let (io, stream) = stream.replace(());
let host = DNSNameRef::try_from_ascii_str(stream.host()).unwrap();
ConnectAsyncExt {
fut: TlsConnector::from(self.connector.clone()).connect(host, io),
stream: Some(stream),
}
}
}
pub struct ConnectAsyncExt<T, U> {
fut: Connect<U>,
stream: Option<Connection<T, ()>>,
}
impl<T: Address, U> Future for ConnectAsyncExt<T, U>
where
U: AsyncRead + AsyncWrite + fmt::Debug,
{
type Item = Connection<T, TlsStream<U, ClientSession>>;
type Error = std::io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.fut.poll().map_err(|e| {
trace!("SSL Handshake error: {:?}", e);
e
})? {
Async::Ready(stream) => {
let s = self.stream.take().unwrap();
trace!("SSL Handshake success: {:?}", s.host());
Ok(Async::Ready(s.replace(stream).1))
}
Async::NotReady => Ok(Async::NotReady),
}
}
}

View File

@@ -26,6 +26,22 @@ fn test_string() {
assert_eq!(con.peer_addr().unwrap(), srv.addr());
}
#[cfg(feature = "rust-tls")]
#[test]
fn test_rustls_string() {
let mut srv = TestServer::with(|| {
service_fn(|io: Io<tokio_tcp::TcpStream>| {
Framed::new(io.into_parts().0, BytesCodec)
.send(Bytes::from_static(b"test"))
.then(|_| Ok::<_, ()>(()))
})
});
let mut conn = default_connector();
let addr = format!("localhost:{}", srv.port());
let con = srv.run_on(move || conn.call(addr.into())).unwrap();
assert_eq!(con.peer_addr().unwrap(), srv.addr());
}
#[test]
fn test_static_str() {
let mut srv = TestServer::with(|| {
@@ -107,3 +123,20 @@ fn test_uri() {
let con = srv.run_on(move || conn.call(addr.into())).unwrap();
assert_eq!(con.peer_addr().unwrap(), srv.addr());
}
#[cfg(feature = "rust-tls")]
#[test]
fn test_rustls_uri() {
let mut srv = TestServer::with(|| {
service_fn(|io: Io<tokio_tcp::TcpStream>| {
Framed::new(io.into_parts().0, BytesCodec)
.send(Bytes::from_static(b"test"))
.then(|_| Ok::<_, ()>(()))
})
});
let mut conn = default_connector();
let addr = Uri::try_from(format!("https://localhost:{}", srv.port())).unwrap();
let con = srv.run_on(move || conn.call(addr.into())).unwrap();
assert_eq!(con.peer_addr().unwrap(), srv.addr());
}

5
actix-ioframe/CHANGES.md Normal file
View File

@@ -0,0 +1,5 @@
# Changes
## [0.1.0] - 2019-07-17
* Initial release

35
actix-ioframe/Cargo.toml Normal file
View File

@@ -0,0 +1,35 @@
[package]
name = "actix-ioframe"
version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix framed service"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-ioframed/"
categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0"
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
edition = "2018"
workspace = ".."
[lib]
name = "actix_ioframe"
path = "src/lib.rs"
[dependencies]
actix-service = "0.4.1"
actix-codec = "0.1.2"
bytes = "0.4"
either = "1.5.2"
futures = "0.1.25"
tokio-current-thread = "0.1.4"
log = "0.4"
[dev-dependencies]
actix-rt = "0.2.2"
actix-connect = "0.2.0"
actix-test-server = "0.2.2"
actix-server-config = "0.1.1"
tokio-tcp = "0.1"
tokio-timer = "0.2"

View File

@@ -0,0 +1 @@
../LICENSE-APACHE

1
actix-ioframe/LICENSE-MIT Symbolic link
View File

@@ -0,0 +1 @@
../LICENSE-MIT

35
actix-ioframe/src/cell.rs Normal file
View File

@@ -0,0 +1,35 @@
//! Custom cell impl
use std::cell::UnsafeCell;
use std::fmt;
use std::rc::Rc;
pub(crate) struct Cell<T> {
inner: Rc<UnsafeCell<T>>,
}
impl<T> Clone for Cell<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T: fmt::Debug> fmt::Debug for Cell<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.inner.fmt(f)
}
}
impl<T> Cell<T> {
pub fn new(inner: T) -> Self {
Self {
inner: Rc::new(UnsafeCell::new(inner)),
}
}
pub(crate) unsafe fn get_mut(&mut self) -> &mut T {
&mut *self.inner.as_ref().get()
}
}

View File

@@ -0,0 +1,110 @@
use std::marker::PhantomData;
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use futures::unsync::mpsc;
use crate::dispatcher::FramedMessage;
use crate::sink::Sink;
pub struct Connect<Io, St = (), Codec = ()> {
io: Io,
_t: PhantomData<(St, Codec)>,
}
impl<Io> Connect<Io>
where
Io: AsyncRead + AsyncWrite,
{
pub(crate) fn new(io: Io) -> Self {
Self {
io,
_t: PhantomData,
}
}
pub fn codec<Codec>(self, codec: Codec) -> ConnectResult<Io, (), Codec>
where
Codec: Encoder + Decoder,
{
let (tx, rx) = mpsc::unbounded();
let sink = Sink::new(tx);
ConnectResult {
state: (),
framed: Framed::new(self.io, codec),
rx,
sink,
}
}
}
pub struct ConnectResult<Io, St, Codec: Encoder + Decoder> {
pub(crate) state: St,
pub(crate) framed: Framed<Io, Codec>,
pub(crate) rx: mpsc::UnboundedReceiver<FramedMessage<<Codec as Encoder>::Item>>,
pub(crate) sink: Sink<<Codec as Encoder>::Item>,
}
impl<Io, St, Codec: Encoder + Decoder> ConnectResult<Io, St, Codec> {
#[inline]
pub fn sink(&self) -> &Sink<<Codec as Encoder>::Item> {
&self.sink
}
#[inline]
pub fn get_ref(&self) -> &Io {
self.framed.get_ref()
}
#[inline]
pub fn get_mut(&mut self) -> &mut Io {
self.framed.get_mut()
}
#[inline]
pub fn state<S>(self, state: S) -> ConnectResult<Io, S, Codec> {
ConnectResult {
state,
framed: self.framed,
rx: self.rx,
sink: self.sink,
}
}
}
impl<Io, St, Codec> futures::Stream for ConnectResult<Io, St, Codec>
where
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
{
type Item = <Codec as Decoder>::Item;
type Error = <Codec as Decoder>::Error;
fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
self.framed.poll()
}
}
impl<Io, St, Codec> futures::Sink for ConnectResult<Io, St, Codec>
where
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
{
type SinkItem = <Codec as Encoder>::Item;
type SinkError = <Codec as Encoder>::Error;
fn start_send(
&mut self,
item: Self::SinkItem,
) -> futures::StartSend<Self::SinkItem, Self::SinkError> {
self.framed.start_send(item)
}
fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> {
self.framed.poll_complete()
}
fn close(&mut self) -> futures::Poll<(), Self::SinkError> {
self.framed.close()
}
}

View File

@@ -0,0 +1,325 @@
//! Framed dispatcher service and related utilities
use std::collections::VecDeque;
use std::mem;
use std::rc::Rc;
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use actix_service::{IntoService, Service};
use futures::task::AtomicTask;
use futures::unsync::{mpsc, oneshot};
use futures::{Async, Future, Poll, Sink as FutureSink, Stream};
use log::debug;
use crate::cell::Cell;
use crate::error::ServiceError;
use crate::item::Item;
use crate::sink::Sink;
use crate::state::State;
type Request<S, U> = Item<S, U>;
type Response<U> = <U as Encoder>::Item;
pub(crate) enum FramedMessage<T> {
Message(T),
Close,
WaitClose(oneshot::Sender<()>),
}
/// FramedTransport - is a future that reads frames from Framed object
/// and pass then to the service.
pub(crate) struct FramedDispatcher<St, S, T, U>
where
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Encoder + Decoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
service: S,
sink: Sink<<U as Encoder>::Item>,
state: State<St>,
dispatch_state: FramedState<S, U>,
framed: Framed<T, U>,
rx: Option<mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>>,
inner: Cell<FramedDispatcherInner<<U as Encoder>::Item, S::Error>>,
disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
}
impl<St, S, T, U> FramedDispatcher<St, S, T, U>
where
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
pub(crate) fn new<F: IntoService<S>>(
framed: Framed<T, U>,
state: State<St>,
service: F,
rx: mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>,
sink: Sink<<U as Encoder>::Item>,
disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
) -> Self {
FramedDispatcher {
framed,
state,
sink,
disconnect,
rx: Some(rx),
service: service.into_service(),
dispatch_state: FramedState::Processing,
inner: Cell::new(FramedDispatcherInner {
buf: VecDeque::new(),
task: AtomicTask::new(),
}),
}
}
}
enum FramedState<S: Service, U: Encoder + Decoder> {
Processing,
Error(ServiceError<S::Error, U>),
FramedError(ServiceError<S::Error, U>),
FlushAndStop(Vec<oneshot::Sender<()>>),
Stopping,
}
impl<S: Service, U: Encoder + Decoder> FramedState<S, U> {
fn stop(&mut self, tx: Option<oneshot::Sender<()>>) {
match self {
FramedState::FlushAndStop(ref mut vec) => {
if let Some(tx) = tx {
vec.push(tx)
}
}
FramedState::Processing => {
*self = FramedState::FlushAndStop(if let Some(tx) = tx {
vec![tx]
} else {
Vec::new()
})
}
FramedState::Error(_) | FramedState::FramedError(_) | FramedState::Stopping => {
if let Some(tx) = tx {
let _ = tx.send(());
}
}
}
}
}
struct FramedDispatcherInner<I, E> {
buf: VecDeque<Result<I, E>>,
task: AtomicTask,
}
impl<St, S, T, U> FramedDispatcher<St, S, T, U>
where
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
fn disconnect(&mut self, error: bool) {
if let Some(ref disconnect) = self.disconnect {
(&*disconnect)(&mut *self.state.get_mut(), error);
}
}
fn poll_read(&mut self) -> bool {
loop {
match self.service.poll_ready() {
Ok(Async::Ready(_)) => {
let item = match self.framed.poll() {
Ok(Async::Ready(Some(el))) => el,
Err(err) => {
self.dispatch_state =
FramedState::FramedError(ServiceError::Decoder(err));
return true;
}
Ok(Async::NotReady) => return false,
Ok(Async::Ready(None)) => {
self.dispatch_state = FramedState::Stopping;
return true;
}
};
let mut cell = self.inner.clone();
unsafe { cell.get_mut().task.register() };
tokio_current_thread::spawn(
self.service
.call(Item::new(self.state.clone(), self.sink.clone(), item))
.then(move |item| {
let item = match item {
Ok(Some(item)) => Ok(item),
Ok(None) => return Ok(()),
Err(err) => Err(err),
};
unsafe {
let inner = cell.get_mut();
inner.buf.push_back(item);
inner.task.notify();
}
Ok(())
}),
);
}
Ok(Async::NotReady) => return false,
Err(err) => {
self.dispatch_state = FramedState::Error(ServiceError::Service(err));
return true;
}
}
}
}
/// write to framed object
fn poll_write(&mut self) -> bool {
let inner = unsafe { self.inner.get_mut() };
let mut rx_done = self.rx.is_none();
let mut buf_empty = inner.buf.is_empty();
loop {
while !self.framed.is_write_buf_full() {
if !buf_empty {
match inner.buf.pop_front().unwrap() {
Ok(msg) => {
if let Err(err) = self.framed.force_send(msg) {
self.dispatch_state =
FramedState::FramedError(ServiceError::Encoder(err));
return true;
}
buf_empty = inner.buf.is_empty();
}
Err(err) => {
self.dispatch_state =
FramedState::Error(ServiceError::Service(err));
return true;
}
}
}
if !rx_done && self.rx.is_some() {
match self.rx.as_mut().unwrap().poll() {
Ok(Async::Ready(Some(FramedMessage::Message(msg)))) => {
if let Err(err) = self.framed.force_send(msg) {
self.dispatch_state =
FramedState::FramedError(ServiceError::Encoder(err));
return true;
}
}
Ok(Async::Ready(Some(FramedMessage::Close))) => {
self.dispatch_state.stop(None);
return true;
}
Ok(Async::Ready(Some(FramedMessage::WaitClose(tx)))) => {
self.dispatch_state.stop(Some(tx));
return true;
}
Ok(Async::Ready(None)) => {
rx_done = true;
let _ = self.rx.take();
}
Ok(Async::NotReady) => rx_done = true,
Err(_e) => {
rx_done = true;
let _ = self.rx.take();
}
}
}
if rx_done && buf_empty {
break;
}
}
if !self.framed.is_write_buf_empty() {
match self.framed.poll_complete() {
Ok(Async::NotReady) => break,
Err(err) => {
debug!("Error sending data: {:?}", err);
self.dispatch_state =
FramedState::FramedError(ServiceError::Encoder(err));
return true;
}
Ok(Async::Ready(_)) => (),
}
} else {
break;
}
}
false
}
}
impl<St, S, T, U> Future for FramedDispatcher<St, S, T, U>
where
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
type Item = ();
type Error = ServiceError<S::Error, U>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match mem::replace(&mut self.dispatch_state, FramedState::Processing) {
FramedState::Processing => {
if self.poll_read() || self.poll_write() {
self.poll()
} else {
Ok(Async::NotReady)
}
}
FramedState::Error(err) => {
if self.framed.is_write_buf_empty()
|| (self.poll_write() || self.framed.is_write_buf_empty())
{
self.disconnect(true);
Err(err)
} else {
self.dispatch_state = FramedState::Error(err);
Ok(Async::NotReady)
}
}
FramedState::FlushAndStop(mut vec) => {
if !self.framed.is_write_buf_empty() {
match self.framed.poll_complete() {
Err(err) => {
debug!("Error sending data: {:?}", err);
}
Ok(Async::NotReady) => {
self.dispatch_state = FramedState::FlushAndStop(vec);
return Ok(Async::NotReady);
}
Ok(Async::Ready(_)) => (),
}
};
for tx in vec.drain(..) {
let _ = tx.send(());
}
self.disconnect(false);
Ok(Async::Ready(()))
}
FramedState::FramedError(err) => {
self.disconnect(true);
Err(err)
}
FramedState::Stopping => {
self.disconnect(false);
Ok(Async::Ready(()))
}
}
}
}

View File

@@ -0,0 +1,49 @@
use std::fmt;
use actix_codec::{Decoder, Encoder};
/// Framed service errors
pub enum ServiceError<E, U: Encoder + Decoder> {
/// Inner service error
Service(E),
/// Encoder parse error
Encoder(<U as Encoder>::Error),
/// Decoder parse error
Decoder(<U as Decoder>::Error),
}
impl<E, U: Encoder + Decoder> From<E> for ServiceError<E, U> {
fn from(err: E) -> Self {
ServiceError::Service(err)
}
}
impl<E, U: Encoder + Decoder> fmt::Debug for ServiceError<E, U>
where
E: fmt::Debug,
<U as Encoder>::Error: fmt::Debug,
<U as Decoder>::Error: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
ServiceError::Service(ref e) => write!(fmt, "ServiceError::Service({:?})", e),
ServiceError::Encoder(ref e) => write!(fmt, "ServiceError::Encoder({:?})", e),
ServiceError::Decoder(ref e) => write!(fmt, "ServiceError::Encoder({:?})", e),
}
}
}
impl<E, U: Encoder + Decoder> fmt::Display for ServiceError<E, U>
where
E: fmt::Display,
<U as Encoder>::Error: fmt::Debug,
<U as Decoder>::Error: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
ServiceError::Service(ref e) => write!(fmt, "{}", e),
ServiceError::Encoder(ref e) => write!(fmt, "{:?}", e),
ServiceError::Decoder(ref e) => write!(fmt, "{:?}", e),
}
}
}

90
actix-ioframe/src/item.rs Normal file
View File

@@ -0,0 +1,90 @@
use std::cell::{Ref, RefMut};
use std::fmt;
use std::ops::{Deref, DerefMut};
use actix_codec::{Decoder, Encoder};
use crate::sink::Sink;
use crate::state::State;
pub struct Item<St, Codec: Encoder + Decoder> {
state: State<St>,
sink: Sink<<Codec as Encoder>::Item>,
item: <Codec as Decoder>::Item,
}
impl<St, Codec> Item<St, Codec>
where
Codec: Encoder + Decoder,
{
pub(crate) fn new(
state: State<St>,
sink: Sink<<Codec as Encoder>::Item>,
item: <Codec as Decoder>::Item,
) -> Self {
Item { state, sink, item }
}
#[inline]
pub fn state(&self) -> Ref<St> {
self.state.get_ref()
}
#[inline]
pub fn state_mut(&mut self) -> RefMut<St> {
self.state.get_mut()
}
#[inline]
pub fn sink(&self) -> &Sink<<Codec as Encoder>::Item> {
&self.sink
}
#[inline]
pub fn into_inner(self) -> <Codec as Decoder>::Item {
self.item
}
#[inline]
pub fn into_parts(
self,
) -> (
State<St>,
Sink<<Codec as Encoder>::Item>,
<Codec as Decoder>::Item,
) {
(self.state, self.sink, self.item)
}
}
impl<St, Codec> Deref for Item<St, Codec>
where
Codec: Encoder + Decoder,
{
type Target = <Codec as Decoder>::Item;
#[inline]
fn deref(&self) -> &<Codec as Decoder>::Item {
&self.item
}
}
impl<St, Codec> DerefMut for Item<St, Codec>
where
Codec: Encoder + Decoder,
{
#[inline]
fn deref_mut(&mut self) -> &mut <Codec as Decoder>::Item {
&mut self.item
}
}
impl<St, Codec> fmt::Debug for Item<St, Codec>
where
Codec: Encoder + Decoder,
<Codec as Decoder>::Item: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("FramedItem").field(&self.item).finish()
}
}

15
actix-ioframe/src/lib.rs Normal file
View File

@@ -0,0 +1,15 @@
mod cell;
mod connect;
mod dispatcher;
mod error;
mod item;
mod service;
mod sink;
mod state;
pub use self::connect::{Connect, ConnectResult};
pub use self::error::ServiceError;
pub use self::item::Item;
pub use self::service::{Builder, NewServiceBuilder, ServiceBuilder};
pub use self::sink::Sink;
pub use self::state::State;

View File

@@ -0,0 +1,363 @@
use std::marker::PhantomData;
use std::rc::Rc;
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder};
use actix_service::{IntoNewService, IntoService, NewService, Service};
use futures::{Async, Future, Poll};
use crate::connect::{Connect, ConnectResult};
use crate::dispatcher::FramedDispatcher;
use crate::error::ServiceError;
use crate::item::Item;
use crate::state::State;
type RequestItem<S, U> = Item<S, U>;
type ResponseItem<U> = Option<<U as Encoder>::Item>;
/// Service builder - structure that follows the builder pattern
/// for building instances for framed services.
pub struct Builder<St, Codec>(PhantomData<(St, Codec)>);
impl<St, Codec> Builder<St, Codec> {
pub fn new() -> Builder<St, Codec> {
Builder(PhantomData)
}
/// Construct framed handler service with specified connect service
pub fn service<Io, C, F>(self, connect: F) -> ServiceBuilder<St, C, Io, Codec>
where
F: IntoService<C>,
Io: AsyncRead + AsyncWrite,
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
Codec: Decoder + Encoder,
{
ServiceBuilder {
connect: connect.into_service(),
disconnect: None,
_t: PhantomData,
}
}
/// Construct framed handler new service with specified connect service
pub fn factory<Io, C, F>(self, connect: F) -> NewServiceBuilder<St, C, Io, Codec>
where
F: IntoNewService<C>,
Io: AsyncRead + AsyncWrite,
C: NewService<
Config = (),
Request = Connect<Io>,
Response = ConnectResult<Io, St, Codec>,
>,
C::Error: 'static,
C::Future: 'static,
Codec: Decoder + Encoder,
{
NewServiceBuilder {
connect: connect.into_new_service(),
disconnect: None,
_t: PhantomData,
}
}
}
pub struct ServiceBuilder<St, C, Io, Codec> {
connect: C,
disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
_t: PhantomData<(St, Io, Codec)>,
}
impl<St, C, Io, Codec> ServiceBuilder<St, C, Io, Codec>
where
St: 'static,
Io: AsyncRead + AsyncWrite,
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Error: 'static,
Codec: Decoder + Encoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
/// Callback to execute on disconnect
///
/// Second parameter indicates error occured during disconnect.
pub fn disconnect<F, Out>(mut self, disconnect: F) -> Self
where
F: Fn(&mut St, bool) + 'static,
{
self.disconnect = Some(Rc::new(disconnect));
self
}
/// Provide stream items handler service and construct service factory.
pub fn finish<F, T>(
self,
service: F,
) -> impl Service<Request = Io, Response = (), Error = ServiceError<C::Error, Codec>>
where
F: IntoNewService<T>,
T: NewService<
Config = St,
Request = RequestItem<St, Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
> + 'static,
{
FramedServiceImpl {
connect: self.connect,
handler: Rc::new(service.into_new_service()),
disconnect: self.disconnect.clone(),
_t: PhantomData,
}
}
}
pub struct NewServiceBuilder<St, C, Io, Codec> {
connect: C,
disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
_t: PhantomData<(St, Io, Codec)>,
}
impl<St, C, Io, Codec> NewServiceBuilder<St, C, Io, Codec>
where
St: 'static,
Io: AsyncRead + AsyncWrite,
C: NewService<Config = (), Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Error: 'static,
C::Future: 'static,
Codec: Decoder + Encoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
/// Callback to execute on disconnect
///
/// Second parameter indicates error occured during disconnect.
pub fn disconnect<F>(mut self, disconnect: F) -> Self
where
F: Fn(&mut St, bool) + 'static,
{
self.disconnect = Some(Rc::new(disconnect));
self
}
pub fn finish<F, T, Cfg>(
self,
service: F,
) -> impl NewService<
Config = Cfg,
Request = Io,
Response = (),
Error = ServiceError<C::Error, Codec>,
>
where
F: IntoNewService<T>,
T: NewService<
Config = St,
Request = RequestItem<St, Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
> + 'static,
{
FramedService {
connect: self.connect,
handler: Rc::new(service.into_new_service()),
disconnect: self.disconnect,
_t: PhantomData,
}
}
}
pub(crate) struct FramedService<St, C, T, Io, Codec, Cfg> {
connect: C,
handler: Rc<T>,
disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
_t: PhantomData<(St, Io, Codec, Cfg)>,
}
impl<St, C, T, Io, Codec, Cfg> NewService for FramedService<St, C, T, Io, Codec, Cfg>
where
St: 'static,
Io: AsyncRead + AsyncWrite,
C: NewService<Config = (), Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Error: 'static,
C::Future: 'static,
T: NewService<
Config = St,
Request = RequestItem<St, Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
> + 'static,
Codec: Decoder + Encoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
type Config = Cfg;
type Request = Io;
type Response = ();
type Error = ServiceError<C::Error, Codec>;
type InitError = C::InitError;
type Service = FramedServiceImpl<St, C::Service, T, Io, Codec>;
type Future = Box<dyn Future<Item = Self::Service, Error = Self::InitError>>;
fn new_service(&self, _: &Cfg) -> Self::Future {
let handler = self.handler.clone();
let disconnect = self.disconnect.clone();
// create connect service and then create service impl
Box::new(
self.connect
.new_service(&())
.map(move |connect| FramedServiceImpl {
connect,
handler,
disconnect,
_t: PhantomData,
}),
)
}
}
pub struct FramedServiceImpl<St, C, T, Io, Codec> {
connect: C,
handler: Rc<T>,
disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
_t: PhantomData<(St, Io, Codec)>,
}
impl<St, C, T, Io, Codec> Service for FramedServiceImpl<St, C, T, Io, Codec>
where
Io: AsyncRead + AsyncWrite,
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Error: 'static,
T: NewService<
Config = St,
Request = RequestItem<St, Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<<T as NewService>::Service as Service>::Future: 'static,
Codec: Decoder + Encoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
type Request = Io;
type Response = ();
type Error = ServiceError<C::Error, Codec>;
type Future = FramedServiceImplResponse<St, Io, Codec, C, T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.connect.poll_ready().map_err(|e| e.into())
}
fn call(&mut self, req: Io) -> Self::Future {
FramedServiceImplResponse {
inner: FramedServiceImplResponseInner::Connect(
self.connect.call(Connect::new(req)),
self.handler.clone(),
),
disconnect: self.disconnect.clone(),
}
}
}
pub struct FramedServiceImplResponse<St, Io, Codec, C, T>
where
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Error: 'static,
T: NewService<
Config = St,
Request = RequestItem<St, Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<<T as NewService>::Service as Service>::Future: 'static,
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
inner: FramedServiceImplResponseInner<St, Io, Codec, C, T>,
disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
}
enum FramedServiceImplResponseInner<St, Io, Codec, C, T>
where
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Error: 'static,
T: NewService<
Config = St,
Request = RequestItem<St, Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<<T as NewService>::Service as Service>::Future: 'static,
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
Connect(C::Future, Rc<T>),
Handler(T::Future, Option<ConnectResult<Io, St, Codec>>),
Dispatcher(FramedDispatcher<St, T::Service, Io, Codec>),
}
impl<St, Io, Codec, C, T> Future for FramedServiceImplResponse<St, Io, Codec, C, T>
where
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Error: 'static,
T: NewService<
Config = St,
Request = RequestItem<St, Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<<T as NewService>::Service as Service>::Future: 'static,
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
type Item = ();
type Error = ServiceError<C::Error, Codec>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner {
FramedServiceImplResponseInner::Connect(ref mut fut, ref handler) => {
match fut.poll()? {
Async::Ready(res) => {
self.inner = FramedServiceImplResponseInner::Handler(
handler.new_service(&res.state),
Some(res),
);
self.poll()
}
Async::NotReady => Ok(Async::NotReady),
}
}
FramedServiceImplResponseInner::Handler(ref mut fut, ref mut res) => {
match fut.poll()? {
Async::Ready(handler) => {
let res = res.take().unwrap();
self.inner =
FramedServiceImplResponseInner::Dispatcher(FramedDispatcher::new(
res.framed,
State::new(res.state),
handler,
res.rx,
res.sink,
self.disconnect.clone(),
));
self.poll()
}
Async::NotReady => Ok(Async::NotReady),
}
}
FramedServiceImplResponseInner::Dispatcher(ref mut fut) => fut.poll(),
}
}
}

44
actix-ioframe/src/sink.rs Normal file
View File

@@ -0,0 +1,44 @@
use std::fmt;
use futures::unsync::{mpsc, oneshot};
use futures::Future;
use crate::dispatcher::FramedMessage;
pub struct Sink<T>(mpsc::UnboundedSender<FramedMessage<T>>);
impl<T> Clone for Sink<T> {
fn clone(&self) -> Self {
Sink(self.0.clone())
}
}
impl<T> Sink<T> {
pub(crate) fn new(tx: mpsc::UnboundedSender<FramedMessage<T>>) -> Self {
Sink(tx)
}
/// Close connection
pub fn close(&self) {
let _ = self.0.unbounded_send(FramedMessage::Close);
}
/// Close connection
pub fn wait_close(&self) -> impl Future<Item = (), Error = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.unbounded_send(FramedMessage::WaitClose(tx));
rx.map_err(|_| ())
}
/// Send item
pub fn send(&self, item: T) {
let _ = self.0.unbounded_send(FramedMessage::Message(item));
}
}
impl<T> fmt::Debug for Sink<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Sink").finish()
}
}

View File

@@ -0,0 +1,30 @@
use std::cell::{Ref, RefCell, RefMut};
use std::rc::Rc;
/// Connection state
///
/// Connection state is an arbitrary data attached to the each incoming message.
#[derive(Debug)]
pub struct State<T>(Rc<RefCell<T>>);
impl<T> State<T> {
pub(crate) fn new(st: T) -> Self {
State(Rc::new(RefCell::new(st)))
}
#[inline]
pub fn get_ref(&self) -> Ref<T> {
self.0.borrow()
}
#[inline]
pub fn get_mut(&mut self) -> RefMut<T> {
self.0.borrow_mut()
}
}
impl<T> Clone for State<T> {
fn clone(&self) -> Self {
State(self.0.clone())
}
}

View File

@@ -0,0 +1,60 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use actix_codec::BytesCodec;
use actix_server_config::Io;
use actix_service::{new_apply_fn, Service};
use actix_test_server::TestServer;
use futures::Future;
use tokio_tcp::TcpStream;
use tokio_timer::sleep;
use actix_ioframe::{Builder, Connect};
struct State;
#[test]
fn test_disconnect() -> std::io::Result<()> {
let disconnect = Arc::new(AtomicBool::new(false));
let disconnect1 = disconnect.clone();
let mut srv = TestServer::with(move || {
let disconnect1 = disconnect1.clone();
new_apply_fn(
Builder::new()
.factory(|conn: Connect<_>| Ok(conn.codec(BytesCodec).state(State)))
.disconnect(move |_, _| {
disconnect1.store(true, Ordering::Relaxed);
})
.finish(|_t| Ok(None)),
|io: Io<TcpStream>, srv| srv.call(io.into_parts().0),
)
});
let mut client = Builder::new()
.service(|conn: Connect<_>| {
let conn = conn.codec(BytesCodec).state(State);
conn.sink().close();
Ok(conn)
})
.finish(|_t| Ok(None));
let conn = srv
.block_on(
actix_connect::default_connector()
.call(actix_connect::Connect::with(String::new(), srv.addr())),
)
.unwrap();
srv.block_on(client.call(conn.into_parts().0)).unwrap();
let _ = srv.block_on(
sleep(Duration::from_millis(100))
.map(|_| ())
.map_err(|_| ()),
);
assert!(disconnect.load(Ordering::Relaxed));
Ok(())
}

View File

@@ -1,11 +1,33 @@
# Changes
## [0.2.5] - 2019-09-02
### Added
* Add arbiter specific storage
## [0.2.4] - 2019-07-17
### Changed
* Avoid a copy of the Future when initializing the Box. #29
## [0.2.3] - 2019-06-22
### Added
* Allow to start System using exsiting CurrentThread Handle #22
## [0.2.2] - 2019-03-28
### Changed
* Moved `blocking` module to `actix-threadpool` crate
## [0.2.1] - 2019-03-11
### Added
@@ -16,12 +38,14 @@
* Arbiter::exec - execute fn on the arbiter's thread and wait result
## [0.2.0] - 2019-03-06
* `run` method returns `io::Result<()>`
* Removed `Handle`
## [0.1.0] - 2018-12-09
* Initial release

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-rt"
version = "0.2.2"
version = "0.2.5"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix runtime"
keywords = ["network", "framework", "async", "futures"]
@@ -18,9 +18,10 @@ name = "actix_rt"
path = "src/lib.rs"
[dependencies]
actix-threadpool = "0.1.0"
actix-threadpool = "0.1.1"
futures = "0.1.25"
tokio-current-thread = "0.1"
tokio-executor = "0.1.5"
tokio-reactor = "0.1.7"
tokio-timer = "0.2.8"
copyless = "0.1.4"

1
actix-rt/LICENSE-APACHE Symbolic link
View File

@@ -0,0 +1 @@
../LICENSE-APACHE

1
actix-rt/LICENSE-MIT Symbolic link
View File

@@ -0,0 +1 @@
../LICENSE-MIT

View File

@@ -1,3 +1,4 @@
use std::any::{Any, TypeId};
use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -11,18 +12,21 @@ use tokio_current_thread::spawn;
use crate::builder::Builder;
use crate::system::System;
use copyless::BoxHelper;
thread_local!(
static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None);
static RUNNING: Cell<bool> = Cell::new(false);
static Q: RefCell<Vec<Box<Future<Item = (), Error = ()>>>> = RefCell::new(Vec::new());
static Q: RefCell<Vec<Box<dyn Future<Item = (), Error = ()>>>> = RefCell::new(Vec::new());
static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
);
pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);
pub(crate) enum ArbiterCommand {
Stop,
Execute(Box<Future<Item = (), Error = ()> + Send>),
ExecuteFn(Box<FnExec>),
Execute(Box<dyn Future<Item = (), Error = ()> + Send>),
ExecuteFn(Box<dyn FnExec>),
}
impl fmt::Debug for ArbiterCommand {
@@ -54,6 +58,7 @@ impl Arbiter {
let arb = Arbiter(tx);
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
RUNNING.with(|cell| cell.set(false));
STORAGE.with(|cell| cell.borrow_mut().clear());
Arbiter::spawn(ArbiterController { stop: None, rx });
arb
@@ -88,6 +93,7 @@ impl Arbiter {
let (stop, stop_rx) = channel();
RUNNING.with(|cell| cell.set(true));
STORAGE.with(|cell| cell.borrow_mut().clear());
System::set_current(sys);
@@ -141,9 +147,9 @@ impl Arbiter {
{
RUNNING.with(move |cell| {
if cell.get() {
spawn(Box::new(future));
spawn(Box::alloc().init(future));
} else {
Q.with(move |cell| cell.borrow_mut().push(Box::new(future)));
Q.with(move |cell| cell.borrow_mut().push(Box::alloc().init(future)));
}
});
}
@@ -178,7 +184,7 @@ impl Arbiter {
let _ = self
.0
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
let _ = f();
f();
})));
}
@@ -200,6 +206,50 @@ impl Arbiter {
})));
rx
}
/// Set item to arbiter storage
pub fn set_item<T: 'static>(item: T) {
STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::<T>(), Box::new(item)));
}
/// Check if arbiter storage contains item
pub fn contains_item<T: 'static>() -> bool {
STORAGE.with(move |cell| cell.borrow().get(&TypeId::of::<T>()).is_some())
}
/// Get a reference to a type previously inserted on this arbiter's storage.
///
/// Panics is item is not inserted
pub fn get_item<T: 'static, F, R>(mut f: F) -> R
where
F: FnMut(&T) -> R,
{
STORAGE.with(move |cell| {
let st = cell.borrow();
let item = st
.get(&TypeId::of::<T>())
.and_then(|boxed| (&**boxed as &(dyn Any + 'static)).downcast_ref())
.unwrap();
f(item)
})
}
/// Get a mutable reference to a type previously inserted on this arbiter's storage.
///
/// Panics is item is not inserted
pub fn get_mut_item<T: 'static, F, R>(mut f: F) -> R
where
F: FnMut(&mut T) -> R,
{
STORAGE.with(move |cell| {
let mut st = cell.borrow_mut();
let item = st
.get_mut(&TypeId::of::<T>())
.and_then(|boxed| (&mut **boxed as &mut (dyn Any + 'static)).downcast_mut())
.unwrap();
f(item)
})
}
}
struct ArbiterController {
@@ -310,7 +360,7 @@ impl<F> FnExec for F
where
F: FnOnce() + Send + 'static,
{
#[cfg_attr(feature = "cargo-clippy", allow(boxed_local))]
#[allow(clippy::boxed_local)]
fn call_box(self: Box<Self>) {
(*self)()
}

View File

@@ -1,11 +1,12 @@
use std::borrow::Cow;
use std::io;
use futures::future;
use futures::future::{lazy, Future};
use futures::sync::mpsc::unbounded;
use futures::sync::oneshot::{channel, Receiver};
use tokio_current_thread::CurrentThread;
use tokio_current_thread::{CurrentThread, Handle};
use tokio_reactor::Reactor;
use tokio_timer::clock::Clock;
use tokio_timer::timer::Timer;
@@ -69,6 +70,13 @@ impl Builder {
self.create_runtime(|| {})
}
/// Create new System that can run asynchronously.
///
/// This method panics if it cannot start the system arbiter
pub(crate) fn build_async(self, executor: Handle) -> AsyncSystemRunner {
self.create_async_runtime(executor)
}
/// This function will start tokio runtime and will finish once the
/// `System::stop()` message get called.
/// Function `f` get called within tokio runtime context.
@@ -79,6 +87,21 @@ impl Builder {
self.create_runtime(f).run()
}
fn create_async_runtime(self, executor: Handle) -> AsyncSystemRunner {
let (stop_tx, stop) = channel();
let (sys_sender, sys_receiver) = unbounded();
let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic);
// system arbiter
let arb = SystemArbiter::new(stop_tx, sys_receiver);
// start the system arbiter
executor.spawn(arb).expect("could not start system arbiter");
AsyncSystemRunner { stop, system }
}
fn create_runtime<F>(self, f: F) -> SystemRunner
where
F: FnOnce() + 'static,
@@ -127,6 +150,42 @@ impl Builder {
}
}
#[derive(Debug)]
pub(crate) struct AsyncSystemRunner {
stop: Receiver<i32>,
system: System,
}
impl AsyncSystemRunner {
/// This function will start event loop and returns a future that
/// resolves once the `System::stop()` function is called.
pub(crate) fn run_nonblocking(self) -> impl Future<Item = (), Error = io::Error> + Send {
let AsyncSystemRunner { stop, .. } = self;
// run loop
future::lazy(|| {
Arbiter::run_system();
stop.then(|res| match res {
Ok(code) => {
if code != 0 {
Err(io::Error::new(
io::ErrorKind::Other,
format!("Non-zero exit code: {}", code),
))
} else {
Ok(())
}
}
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
})
.then(|result| {
Arbiter::stop_system();
result
})
})
}
}
/// Helper object that runs System's event loop
#[must_use = "SystemRunner must be run"]
#[derive(Debug)]

View File

@@ -40,7 +40,7 @@ impl Error for RunError {
fn description(&self) -> &str {
self.inner.description()
}
fn cause(&self) -> Option<&Error> {
fn cause(&self) -> Option<&dyn Error> {
self.inner.source()
}
}

View File

@@ -3,6 +3,8 @@ use std::io;
use std::sync::atomic::{AtomicUsize, Ordering};
use futures::sync::mpsc::UnboundedSender;
use futures::Future;
use tokio_current_thread::Handle;
use crate::arbiter::{Arbiter, SystemCommand};
use crate::builder::{Builder, SystemRunner};
@@ -55,6 +57,20 @@ impl System {
Self::builder().name(name).build()
}
#[allow(clippy::new_ret_no_self)]
/// Create new system using provided CurrentThread Handle.
///
/// This method panics if it can not spawn system arbiter
pub fn run_in_executor<T: Into<String>>(
name: T,
executor: Handle,
) -> impl Future<Item = (), Error = io::Error> + Send {
Self::builder()
.name(name)
.build_async(executor)
.run_nonblocking()
}
/// Get current running system.
pub fn current() -> System {
CURRENT.with(|cell| match *cell.borrow() {

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-server-config"
version = "0.1.1"
version = "0.1.2"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix server config utils"
homepage = "https://actix.rs"
@@ -14,7 +14,7 @@ name = "actix_server_config"
path = "src/lib.rs"
[package.metadata.docs.rs]
features = ["ssl", "rust-tls"]
features = ["ssl", "rust-tls", "uds"]
[features]
default = []
@@ -25,6 +25,9 @@ ssl = ["tokio-openssl"]
# rustls
rust-tls = ["rustls", "tokio-rustls"]
# unix domain sockets
uds = ["tokio-uds"]
[dependencies]
futures = "0.1.25"
tokio-io = "0.1.12"
@@ -32,3 +35,4 @@ tokio-tcp = "0.1"
tokio-openssl = { version="0.3.0", optional = true }
rustls = { version = "0.15.2", optional = true }
tokio-rustls = { version = "0.9.1", optional = true }
tokio-uds = { version="0.2.5", optional = true }

View File

@@ -0,0 +1 @@
../LICENSE-APACHE

View File

@@ -0,0 +1 @@
../LICENSE-MIT

View File

@@ -1,5 +1,12 @@
# Changes
## [0.1.2] - 2019-07-18
### Added
* Add unix domnain sockets support
## [0.1.1] - 2019-04-16
### Added

View File

@@ -95,9 +95,9 @@ impl<T, P> Io<T, P> {
/// Return new Io object with new parameter.
pub fn set<U>(self, params: U) -> Io<T, U> {
Io {
params,
io: self.io,
proto: self.proto,
params: params,
}
}
@@ -216,3 +216,26 @@ impl<T: IoStream> IoStream for tokio_rustls::TlsStream<T, rustls::ServerSession>
self.get_mut().0.set_keepalive(dur)
}
}
#[cfg(all(unix, feature = "uds"))]
impl IoStream for tokio_uds::UnixStream {
#[inline]
fn peer_addr(&self) -> Option<net::SocketAddr> {
None
}
#[inline]
fn set_nodelay(&mut self, _: bool) -> io::Result<()> {
Ok(())
}
#[inline]
fn set_linger(&mut self, _: Option<time::Duration>) -> io::Result<()> {
Ok(())
}
#[inline]
fn set_keepalive(&mut self, _: Option<time::Duration>) -> io::Result<()> {
Ok(())
}
}

View File

@@ -1,5 +1,19 @@
# Changes
## [0.6.0] - 2019-07-18
### Added
* Support Unix domain sockets #3
## [0.5.1] - 2019-05-18
### Changed
* ServerBuilder::shutdown_timeout() accepts u64
## [0.5.0] - 2019-05-12
### Added

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-server"
version = "0.5.0"
version = "0.6.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix server - General purpose tcp server"
keywords = ["network", "framework", "async", "futures"]
@@ -14,7 +14,7 @@ edition = "2018"
workspace = ".."
[package.metadata.docs.rs]
features = ["ssl", "tls", "rust-tls"]
features = ["ssl", "tls", "rust-tls", "uds"]
[lib]
name = "actix_server"
@@ -32,15 +32,18 @@ ssl = ["openssl", "tokio-openssl", "actix-server-config/ssl"]
# rustls
rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots", "actix-server-config/rust-tls"]
# uds
uds = ["mio-uds", "tokio-uds", "actix-server-config/uds"]
[dependencies]
actix-rt = "0.2.1"
actix-service = "0.4.0"
actix-server-config = "0.1.1"
actix-rt = "0.2.2"
actix-service = "0.4.1"
actix-server-config = "0.1.2"
log = "0.4"
num_cpus = "1.0"
mio = "0.6.13"
mio = "0.6.19"
net2 = "0.2"
futures = "0.1"
slab = "0.4"
@@ -50,6 +53,10 @@ tokio-timer = "0.2.8"
tokio-reactor = "0.1"
tokio-signal = "0.2"
# unix domain sockets
mio-uds = { version="0.6.7", optional = true }
tokio-uds = { version="0.2.5", optional = true }
# native-tls
native-tls = { version="0.2", optional = true }
@@ -57,7 +64,7 @@ native-tls = { version="0.2", optional = true }
openssl = { version="0.10", optional = true }
tokio-openssl = { version="0.3", optional = true }
#rustls
# rustls
rustls = { version = "0.15.2", optional = true }
tokio-rustls = { version = "0.9.1", optional = true }
webpki = { version = "0.19", optional = true }

1
actix-server/LICENSE-APACHE Symbolic link
View File

@@ -0,0 +1 @@
../LICENSE-APACHE

1
actix-server/LICENSE-MIT Symbolic link
View File

@@ -0,0 +1 @@
../LICENSE-MIT

View File

@@ -1,17 +1,17 @@
use std::sync::mpsc as sync_mpsc;
use std::time::{Duration, Instant};
use std::{io, net, thread};
use std::{io, thread};
use actix_rt::System;
use futures::future::{lazy, Future};
use log::{error, info};
use mio;
use slab::Slab;
use tokio_timer::Delay;
use super::server::Server;
use super::worker::{Conn, WorkerClient};
use super::Token;
use crate::server::Server;
use crate::socket::{SocketAddr, SocketListener, StdListener};
use crate::worker::{Conn, WorkerClient};
use crate::Token;
pub(crate) enum Command {
Pause,
@@ -21,9 +21,9 @@ pub(crate) enum Command {
}
struct ServerSocketInfo {
addr: net::SocketAddr,
addr: SocketAddr,
token: Token,
sock: mio::net::TcpListener,
sock: SocketListener,
timeout: Option<Instant>,
}
@@ -84,7 +84,7 @@ impl AcceptLoop {
pub(crate) fn start(
&mut self,
socks: Vec<(Token, net::TcpListener)>,
socks: Vec<(Token, StdListener)>,
workers: Vec<WorkerClient>,
) {
let srv = self.srv.take().expect("Can not re-use AcceptInfo");
@@ -135,7 +135,7 @@ impl Accept {
rx: sync_mpsc::Receiver<Command>,
cmd_reg: mio::Registration,
notify_reg: mio::Registration,
socks: Vec<(Token, net::TcpListener)>,
socks: Vec<(Token, StdListener)>,
srv: Server,
workers: Vec<WorkerClient>,
) {
@@ -174,7 +174,7 @@ impl Accept {
fn new(
rx: sync_mpsc::Receiver<Command>,
socks: Vec<(Token, net::TcpListener)>,
socks: Vec<(Token, StdListener)>,
workers: Vec<WorkerClient>,
srv: Server,
) -> Accept {
@@ -187,10 +187,9 @@ impl Accept {
// Start accept
let mut sockets = Slab::new();
for (hnd_token, lst) in socks.into_iter() {
let addr = lst.local_addr().unwrap();
let server = mio::net::TcpListener::from_std(lst)
.expect("Can not create mio::net::TcpListener");
let addr = lst.local_addr();
let server = lst.into_listener();
let entry = sockets.vacant_entry();
let token = entry.key();
@@ -422,12 +421,13 @@ impl Accept {
fn accept(&mut self, token: usize) {
loop {
let msg = if let Some(info) = self.sockets.get_mut(token) {
match info.sock.accept_std() {
Ok((io, addr)) => Conn {
match info.sock.accept() {
Ok(Some((io, addr))) => Conn {
io,
token: info.token,
peer: Some(addr),
},
Ok(None) => return,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
Err(ref e) if connection_error(e) => continue,
Err(e) => {

View File

@@ -9,6 +9,7 @@ use futures::{Async, Future, Poll, Stream};
use log::{error, info};
use net2::TcpBuilder;
use num_cpus;
use tokio_tcp::TcpStream;
use tokio_timer::sleep;
use crate::accept::{AcceptLoop, AcceptNotify, Command};
@@ -16,6 +17,7 @@ use crate::config::{ConfiguredService, ServiceConfig};
use crate::server::{Server, ServerCommand};
use crate::services::{InternalServiceFactory, ServiceFactory, StreamNewService};
use crate::signals::{Signal, Signals};
use crate::socket::StdListener;
use crate::worker::{self, Worker, WorkerAvailability, WorkerClient};
use crate::{ssl, Token};
@@ -25,8 +27,8 @@ pub struct ServerBuilder {
token: Token,
backlog: i32,
workers: Vec<(usize, WorkerClient)>,
services: Vec<Box<InternalServiceFactory>>,
sockets: Vec<(Token, net::TcpListener)>,
services: Vec<Box<dyn InternalServiceFactory>>,
sockets: Vec<(Token, StdListener)>,
accept: AcceptLoop,
exit: bool,
shutdown_timeout: Duration,
@@ -128,8 +130,8 @@ impl ServerBuilder {
/// dropped.
///
/// By default shutdown timeout sets to 30 seconds.
pub fn shutdown_timeout(mut self, sec: u16) -> Self {
self.shutdown_timeout = Duration::from_secs(u64::from(sec));
pub fn shutdown_timeout(mut self, sec: u64) -> Self {
self.shutdown_timeout = Duration::from_secs(sec);
self
}
@@ -151,7 +153,7 @@ impl ServerBuilder {
for (name, lst) in cfg.services {
let token = self.token.next();
srv.stream(token, name, lst.local_addr()?);
self.sockets.push((token, lst));
self.sockets.push((token, StdListener::Tcp(lst)));
}
self.services.push(Box::new(srv));
}
@@ -163,7 +165,7 @@ impl ServerBuilder {
/// Add new service to the server.
pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
where
F: ServiceFactory,
F: ServiceFactory<TcpStream>,
U: net::ToSocketAddrs,
{
let sockets = bind_addr(addr, self.backlog)?;
@@ -176,11 +178,39 @@ impl ServerBuilder {
factory.clone(),
lst.local_addr()?,
));
self.sockets.push((token, lst));
self.sockets.push((token, StdListener::Tcp(lst)));
}
Ok(self)
}
#[cfg(all(unix, feature = "uds"))]
/// Add new unix domain service to the server.
pub fn bind_uds<F, U, N>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
where
F: ServiceFactory<tokio_uds::UnixStream>,
N: AsRef<str>,
U: AsRef<std::path::Path>,
{
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::os::unix::net::UnixListener;
// TODO: need to do something with existing paths
let _ = std::fs::remove_file(addr.as_ref());
let lst = UnixListener::bind(addr)?;
let token = self.token.next();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
self.services.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory.clone(),
addr,
));
self.sockets.push((token, StdListener::Uds(lst)));
Ok(self)
}
/// Add new service to the server.
pub fn listen<F, N: AsRef<str>>(
mut self,
@@ -189,7 +219,7 @@ impl ServerBuilder {
factory: F,
) -> io::Result<Self>
where
F: ServiceFactory,
F: ServiceFactory<TcpStream>,
{
let token = self.token.next();
self.services.push(StreamNewService::create(
@@ -198,7 +228,7 @@ impl ServerBuilder {
factory,
lst.local_addr()?,
));
self.sockets.push((token, lst));
self.sockets.push((token, StdListener::Tcp(lst)));
Ok(self)
}
@@ -243,7 +273,7 @@ impl ServerBuilder {
// start accept thread
for sock in &self.sockets {
info!("Starting server on {}", sock.1.local_addr().ok().unwrap());
info!("Starting server on {}", sock.1);
}
self.accept
.start(mem::replace(&mut self.sockets, Vec::new()), workers);
@@ -266,7 +296,7 @@ impl ServerBuilder {
let timeout = self.shutdown_timeout;
let avail = WorkerAvailability::new(notify);
let worker = WorkerClient::new(idx, tx1, tx2, avail.clone());
let services: Vec<Box<InternalServiceFactory>> =
let services: Vec<Box<dyn InternalServiceFactory>> =
self.services.iter().map(|v| v.clone_factory()).collect();
Arbiter::new().send(lazy(move || {

View File

@@ -17,7 +17,7 @@ use super::Token;
pub struct ServiceConfig {
pub(crate) services: Vec<(String, net::TcpListener)>,
pub(crate) apply: Option<Box<ServiceRuntimeConfiguration>>,
pub(crate) apply: Option<Box<dyn ServiceRuntimeConfiguration>>,
pub(crate) threads: usize,
pub(crate) backlog: i32,
}
@@ -75,13 +75,13 @@ impl ServiceConfig {
}
pub(super) struct ConfiguredService {
rt: Box<ServiceRuntimeConfiguration>,
rt: Box<dyn ServiceRuntimeConfiguration>,
names: HashMap<Token, (String, net::SocketAddr)>,
services: HashMap<String, Token>,
}
impl ConfiguredService {
pub(super) fn new(rt: Box<ServiceRuntimeConfiguration>) -> Self {
pub(super) fn new(rt: Box<dyn ServiceRuntimeConfiguration>) -> Self {
ConfiguredService {
rt,
names: HashMap::new(),
@@ -100,7 +100,7 @@ impl InternalServiceFactory for ConfiguredService {
&self.names[&token].0
}
fn clone_factory(&self) -> Box<InternalServiceFactory> {
fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
Box::new(Self {
rt: self.rt.clone(),
names: self.names.clone(),
@@ -108,7 +108,7 @@ impl InternalServiceFactory for ConfiguredService {
})
}
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
fn create(&self) -> Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
// configure services
let mut rt = ServiceRuntime::new(self.services.clone());
self.rt.configure(&mut rt);
@@ -156,7 +156,7 @@ impl InternalServiceFactory for ConfiguredService {
}
pub(super) trait ServiceRuntimeConfiguration: Send {
fn clone(&self) -> Box<ServiceRuntimeConfiguration>;
fn clone(&self) -> Box<dyn ServiceRuntimeConfiguration>;
fn configure(&self, rt: &mut ServiceRuntime);
}
@@ -165,7 +165,7 @@ impl<F> ServiceRuntimeConfiguration for F
where
F: Fn(&mut ServiceRuntime) + Send + Clone + 'static,
{
fn clone(&self) -> Box<ServiceRuntimeConfiguration> {
fn clone(&self) -> Box<dyn ServiceRuntimeConfiguration> {
Box::new(self.clone())
}
@@ -181,7 +181,7 @@ fn not_configured(_: &mut ServiceRuntime) {
pub struct ServiceRuntime {
names: HashMap<String, Token>,
services: HashMap<Token, BoxedNewService>,
onstart: Vec<Box<Future<Item = (), Error = ()>>>,
onstart: Vec<Box<dyn Future<Item = (), Error = ()>>>,
}
impl ServiceRuntime {
@@ -236,14 +236,14 @@ impl ServiceRuntime {
}
type BoxedNewService = Box<
NewService<
dyn NewService<
Request = (Option<CounterGuard>, ServerMessage),
Response = (),
Error = (),
InitError = (),
Config = ServerConfig,
Service = BoxedServerService,
Future = Box<Future<Item = BoxedServerService, Error = ()>>,
Future = Box<dyn Future<Item = BoxedServerService, Error = ()>>,
>,
>;
@@ -265,7 +265,7 @@ where
type InitError = ();
type Config = ServerConfig;
type Service = BoxedServerService;
type Future = Box<Future<Item = BoxedServerService, Error = ()>>;
type Future = Box<dyn Future<Item = BoxedServerService, Error = ()>>;
fn new_service(&self, cfg: &ServerConfig) -> Self::Future {
Box::new(self.inner.new_service(cfg).map_err(|_| ()).map(|s| {

View File

@@ -7,6 +7,7 @@ mod counter;
mod server;
mod services;
mod signals;
mod socket;
pub mod ssl;
mod worker;
@@ -17,6 +18,9 @@ pub use self::config::{ServiceConfig, ServiceRuntime};
pub use self::server::Server;
pub use self::services::ServiceFactory;
#[doc(hidden)]
pub use self::socket::FromStream;
#[doc(hidden)]
pub use self::services::ServiceFactory as StreamServiceFactory;

View File

@@ -1,4 +1,5 @@
use std::net::{self, SocketAddr};
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::time::Duration;
use actix_rt::spawn;
@@ -7,24 +8,23 @@ use actix_service::{NewService, Service};
use futures::future::{err, ok, FutureResult};
use futures::{Future, Poll};
use log::error;
use tokio_reactor::Handle;
use tokio_tcp::TcpStream;
use super::Token;
use crate::counter::CounterGuard;
use crate::socket::{FromStream, StdStream};
/// Server message
pub(crate) enum ServerMessage {
/// New stream
Connect(net::TcpStream),
Connect(StdStream),
/// Gracefull shutdown
Shutdown(Duration),
/// Force shutdown
ForceShutdown,
}
pub trait ServiceFactory: Send + Clone + 'static {
type NewService: NewService<Config = ServerConfig, Request = Io<TcpStream>>;
pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
type NewService: NewService<Config = ServerConfig, Request = Io<Stream>>;
fn create(&self) -> Self::NewService;
}
@@ -32,13 +32,13 @@ pub trait ServiceFactory: Send + Clone + 'static {
pub(crate) trait InternalServiceFactory: Send {
fn name(&self, token: Token) -> &str;
fn clone_factory(&self) -> Box<InternalServiceFactory>;
fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>>;
fn create(&self) -> Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>>;
}
pub(crate) type BoxedServerService = Box<
Service<
dyn Service<
Request = (Option<CounterGuard>, ServerMessage),
Response = (),
Error = (),
@@ -56,11 +56,12 @@ impl<T> StreamService<T> {
}
}
impl<T> Service for StreamService<T>
impl<T, I> Service for StreamService<T>
where
T: Service<Request = Io<TcpStream>>,
T: Service<Request = Io<I>>,
T::Future: 'static,
T::Error: 'static,
I: FromStream,
{
type Request = (Option<CounterGuard>, ServerMessage);
type Response = ();
@@ -74,7 +75,7 @@ where
fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
match req {
ServerMessage::Connect(stream) => {
let stream = TcpStream::from_std(stream, &Handle::default()).map_err(|e| {
let stream = FromStream::from_stdstream(stream).map_err(|e| {
error!("Can not convert to an async tcp stream: {}", e);
});
@@ -93,50 +94,55 @@ where
}
}
pub(crate) struct StreamNewService<F: ServiceFactory> {
pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> {
name: String,
inner: F,
token: Token,
addr: SocketAddr,
_t: PhantomData<Io>,
}
impl<F> StreamNewService<F>
impl<F, Io> StreamNewService<F, Io>
where
F: ServiceFactory,
F: ServiceFactory<Io>,
Io: FromStream + Send + 'static,
{
pub(crate) fn create(
name: String,
token: Token,
inner: F,
addr: SocketAddr,
) -> Box<InternalServiceFactory> {
) -> Box<dyn InternalServiceFactory> {
Box::new(Self {
name,
token,
inner,
addr,
_t: PhantomData,
})
}
}
impl<F> InternalServiceFactory for StreamNewService<F>
impl<F, Io> InternalServiceFactory for StreamNewService<F, Io>
where
F: ServiceFactory,
F: ServiceFactory<Io>,
Io: FromStream + Send + 'static,
{
fn name(&self, _: Token) -> &str {
&self.name
}
fn clone_factory(&self) -> Box<InternalServiceFactory> {
fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
Box::new(Self {
name: self.name.clone(),
inner: self.inner.clone(),
token: self.token,
addr: self.addr,
_t: PhantomData,
})
}
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
fn create(&self) -> Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
let token = self.token;
let config = ServerConfig::new(self.addr);
Box::new(
@@ -152,24 +158,25 @@ where
}
}
impl InternalServiceFactory for Box<InternalServiceFactory> {
impl InternalServiceFactory for Box<dyn InternalServiceFactory> {
fn name(&self, token: Token) -> &str {
self.as_ref().name(token)
}
fn clone_factory(&self) -> Box<InternalServiceFactory> {
fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
self.as_ref().clone_factory()
}
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
fn create(&self) -> Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
self.as_ref().create()
}
}
impl<F, T> ServiceFactory for F
impl<F, T, I> ServiceFactory<I> for F
where
F: Fn() -> T + Send + Clone + 'static,
T: NewService<Config = ServerConfig, Request = Io<TcpStream>>,
T: NewService<Config = ServerConfig, Request = Io<I>>,
I: FromStream,
{
type NewService = T;

View File

@@ -27,7 +27,7 @@ pub(crate) struct Signals {
streams: Vec<SigStream>,
}
type SigStream = Box<Stream<Item = Signal, Error = io::Error>>;
type SigStream = Box<dyn Stream<Item = Signal, Error = io::Error>>;
impl Signals {
pub(crate) fn start(srv: Server) {
@@ -46,7 +46,7 @@ impl Signals {
{
use tokio_signal::unix;
let mut sigs: Vec<Box<Future<Item = SigStream, Error = io::Error>>> =
let mut sigs: Vec<Box<dyn Future<Item = SigStream, Error = io::Error>>> =
Vec::new();
sigs.push(Box::new(
tokio_signal::unix::Signal::new(tokio_signal::unix::SIGINT).map(|stream| {

173
actix-server/src/socket.rs Normal file
View File

@@ -0,0 +1,173 @@
use std::{fmt, io, net};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_reactor::Handle;
use tokio_tcp::TcpStream;
pub(crate) enum StdListener {
Tcp(net::TcpListener),
#[cfg(all(unix, feature = "uds"))]
Uds(std::os::unix::net::UnixListener),
}
pub(crate) enum SocketAddr {
Tcp(net::SocketAddr),
#[cfg(all(unix, feature = "uds"))]
Uds(std::os::unix::net::SocketAddr),
}
impl fmt::Display for SocketAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
SocketAddr::Tcp(ref addr) => write!(f, "{}", addr),
#[cfg(all(unix, feature = "uds"))]
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
}
}
}
impl fmt::Debug for SocketAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr),
#[cfg(all(unix, feature = "uds"))]
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
}
}
}
impl fmt::Display for StdListener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
StdListener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()),
#[cfg(all(unix, feature = "uds"))]
StdListener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()),
}
}
}
impl StdListener {
pub(crate) fn local_addr(&self) -> SocketAddr {
match self {
StdListener::Tcp(lst) => SocketAddr::Tcp(lst.local_addr().unwrap()),
#[cfg(all(unix, feature = "uds"))]
StdListener::Uds(lst) => SocketAddr::Uds(lst.local_addr().unwrap()),
}
}
pub(crate) fn into_listener(self) -> SocketListener {
match self {
StdListener::Tcp(lst) => SocketListener::Tcp(
mio::net::TcpListener::from_std(lst)
.expect("Can not create mio::net::TcpListener"),
),
#[cfg(all(unix, feature = "uds"))]
StdListener::Uds(lst) => SocketListener::Uds(
mio_uds::UnixListener::from_listener(lst)
.expect("Can not create mio_uds::UnixListener"),
),
}
}
}
#[derive(Debug)]
pub enum StdStream {
Tcp(std::net::TcpStream),
#[cfg(all(unix, feature = "uds"))]
Uds(std::os::unix::net::UnixStream),
}
pub(crate) enum SocketListener {
Tcp(mio::net::TcpListener),
#[cfg(all(unix, feature = "uds"))]
Uds(mio_uds::UnixListener),
}
impl SocketListener {
pub(crate) fn accept(&self) -> io::Result<Option<(StdStream, SocketAddr)>> {
match *self {
SocketListener::Tcp(ref lst) => lst
.accept_std()
.map(|(stream, addr)| Some((StdStream::Tcp(stream), SocketAddr::Tcp(addr)))),
#[cfg(all(unix, feature = "uds"))]
SocketListener::Uds(ref lst) => lst.accept_std().map(|res| {
res.map(|(stream, addr)| (StdStream::Uds(stream), SocketAddr::Uds(addr)))
}),
}
}
}
impl mio::Evented for SocketListener {
fn register(
&self,
poll: &mio::Poll,
token: mio::Token,
interest: mio::Ready,
opts: mio::PollOpt,
) -> io::Result<()> {
match *self {
SocketListener::Tcp(ref lst) => lst.register(poll, token, interest, opts),
#[cfg(all(unix, feature = "uds"))]
SocketListener::Uds(ref lst) => lst.register(poll, token, interest, opts),
}
}
fn reregister(
&self,
poll: &mio::Poll,
token: mio::Token,
interest: mio::Ready,
opts: mio::PollOpt,
) -> io::Result<()> {
match *self {
SocketListener::Tcp(ref lst) => lst.reregister(poll, token, interest, opts),
#[cfg(all(unix, feature = "uds"))]
SocketListener::Uds(ref lst) => lst.reregister(poll, token, interest, opts),
}
}
fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
match *self {
SocketListener::Tcp(ref lst) => lst.deregister(poll),
#[cfg(all(unix, feature = "uds"))]
SocketListener::Uds(ref lst) => {
let res = lst.deregister(poll);
// cleanup file path
if let Ok(addr) = lst.local_addr() {
if let Some(path) = addr.as_pathname() {
let _ = std::fs::remove_file(path);
}
}
res
}
}
}
}
pub trait FromStream: AsyncRead + AsyncWrite + Sized {
fn from_stdstream(sock: StdStream) -> io::Result<Self>;
}
impl FromStream for TcpStream {
fn from_stdstream(sock: StdStream) -> io::Result<Self> {
match sock {
StdStream::Tcp(stream) => TcpStream::from_std(stream, &Handle::default()),
#[cfg(all(unix, feature = "uds"))]
StdStream::Uds(_) => {
panic!("Should not happen, bug in server impl");
}
}
}
}
#[cfg(all(unix, feature = "uds"))]
impl FromStream for tokio_uds::UnixStream {
fn from_stdstream(sock: StdStream) -> io::Result<Self> {
match sock {
StdStream::Tcp(_) => panic!("Should not happen, bug in server impl"),
StdStream::Uds(stream) => {
tokio_uds::UnixStream::from_std(stream, &Handle::default())
}
}
}
}

View File

@@ -1,6 +1,6 @@
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::{mem, net, time};
use std::{mem, time};
use actix_rt::{spawn, Arbiter};
use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender};
@@ -12,6 +12,7 @@ use tokio_timer::{sleep, Delay};
use crate::accept::AcceptNotify;
use crate::counter::Counter;
use crate::services::{BoxedServerService, InternalServiceFactory, ServerMessage};
use crate::socket::{SocketAddr, StdStream};
use crate::Token;
pub(crate) struct WorkerCommand(Conn);
@@ -25,9 +26,9 @@ pub(crate) struct StopCommand {
#[derive(Debug)]
pub(crate) struct Conn {
pub io: net::TcpStream,
pub io: StdStream,
pub token: Token,
pub peer: Option<net::SocketAddr>,
pub peer: Option<SocketAddr>,
}
static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600);
@@ -127,7 +128,7 @@ pub(crate) struct Worker {
services: Vec<Option<(usize, BoxedServerService)>>,
availability: WorkerAvailability,
conns: Counter,
factories: Vec<Box<InternalServiceFactory>>,
factories: Vec<Box<dyn InternalServiceFactory>>,
state: WorkerState,
shutdown_timeout: time::Duration,
}
@@ -136,7 +137,7 @@ impl Worker {
pub(crate) fn start(
rx: UnboundedReceiver<WorkerCommand>,
rx2: UnboundedReceiver<StopCommand>,
factories: Vec<Box<InternalServiceFactory>>,
factories: Vec<Box<dyn InternalServiceFactory>>,
availability: WorkerAvailability,
shutdown_timeout: time::Duration,
) {
@@ -237,7 +238,7 @@ enum WorkerState {
Restarting(
usize,
Token,
Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>>,
Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>>,
),
Shutdown(Delay, Delay, oneshot::Sender<bool>),
}

View File

@@ -1,5 +1,18 @@
# Changes
## [0.4.2] - 2019-08-27
### Fixed
* Check service readiness for `new_apply_cfg` combinator
## [0.4.1] - 2019-06-06
### Added
* Add `new_apply_cfg` function
## [0.4.0] - 2019-05-12
### Changed

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-service"
version = "0.4.0"
version = "0.4.2"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix Service"
keywords = ["network", "framework", "async", "futures"]

View File

@@ -0,0 +1 @@
../LICENSE-APACHE

1
actix-service/LICENSE-MIT Symbolic link
View File

@@ -0,0 +1 @@
../LICENSE-MIT

View File

@@ -7,7 +7,17 @@ use crate::cell::Cell;
use crate::{IntoService, NewService, Service};
/// Convert `Fn(&Config, &mut Service) -> Future<Service>` fn to a NewService
pub fn apply_cfg<F, C, T, R, S>(srv: T, f: F) -> ApplyConfigService<F, C, T, R, S>
pub fn apply_cfg<F, C, T, R, S>(
srv: T,
f: F,
) -> impl NewService<
Config = C,
Request = S::Request,
Response = S::Response,
Error = S::Error,
Service = S,
InitError = R::Error,
> + Clone
where
F: FnMut(&C, &mut T) -> R,
T: Service,
@@ -22,8 +32,37 @@ where
}
}
/// Convert `Fn(&Config, &mut Service) -> Future<Service>` fn to a NewService
/// Service get constructor from NewService.
pub fn new_apply_cfg<F, C, T, R, S>(
srv: T,
f: F,
) -> impl NewService<
Config = C,
Request = S::Request,
Response = S::Response,
Error = S::Error,
Service = S,
InitError = T::InitError,
> + Clone
where
C: Clone,
F: FnMut(&C, &mut T::Service) -> R,
T: NewService<Config = ()>,
T::InitError: From<T::Error>,
R: IntoFuture<Error = T::InitError>,
R::Item: IntoService<S>,
S: Service,
{
ApplyConfigNewService {
f: Cell::new(f),
srv: Cell::new(srv),
_t: PhantomData,
}
}
/// Convert `Fn(&Config) -> Future<Service>` fn to NewService
pub struct ApplyConfigService<F, C, T, R, S>
struct ApplyConfigService<F, C, T, R, S>
where
F: FnMut(&C, &mut T) -> R,
T: Service,
@@ -79,7 +118,7 @@ where
}
}
pub struct FnNewServiceConfigFut<R, S>
struct FnNewServiceConfigFut<R, S>
where
R: IntoFuture,
R::Item: IntoService<S>,
@@ -102,3 +141,126 @@ where
Ok(Async::Ready(try_ready!(self.fut.poll()).into_service()))
}
}
/// Convert `Fn(&Config) -> Future<Service>` fn to NewService
struct ApplyConfigNewService<F, C, T, R, S>
where
C: Clone,
F: FnMut(&C, &mut T::Service) -> R,
T: NewService<Config = ()>,
R: IntoFuture<Error = T::InitError>,
R::Item: IntoService<S>,
S: Service,
{
f: Cell<F>,
srv: Cell<T>,
_t: PhantomData<(C, R, S)>,
}
impl<F, C, T, R, S> Clone for ApplyConfigNewService<F, C, T, R, S>
where
C: Clone,
F: FnMut(&C, &mut T::Service) -> R,
T: NewService<Config = ()>,
R: IntoFuture<Error = T::InitError>,
R::Item: IntoService<S>,
S: Service,
{
fn clone(&self) -> Self {
ApplyConfigNewService {
f: self.f.clone(),
srv: self.srv.clone(),
_t: PhantomData,
}
}
}
impl<F, C, T, R, S> NewService for ApplyConfigNewService<F, C, T, R, S>
where
C: Clone,
F: FnMut(&C, &mut T::Service) -> R,
T: NewService<Config = ()>,
T::InitError: From<T::Error>,
R: IntoFuture<Error = T::InitError>,
R::Item: IntoService<S>,
S: Service,
{
type Config = C;
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type Service = S;
type InitError = R::Error;
type Future = ApplyConfigNewServiceFut<F, C, T, R, S>;
fn new_service(&self, cfg: &C) -> Self::Future {
ApplyConfigNewServiceFut {
f: self.f.clone(),
cfg: cfg.clone(),
fut: None,
srv: None,
srv_fut: Some(self.srv.get_ref().new_service(&())),
_t: PhantomData,
}
}
}
struct ApplyConfigNewServiceFut<F, C, T, R, S>
where
C: Clone,
F: FnMut(&C, &mut T::Service) -> R,
T: NewService<Config = ()>,
T::InitError: From<T::Error>,
R: IntoFuture<Error = T::InitError>,
R::Item: IntoService<S>,
S: Service,
{
cfg: C,
f: Cell<F>,
srv: Option<T::Service>,
srv_fut: Option<T::Future>,
fut: Option<R::Future>,
_t: PhantomData<(S,)>,
}
impl<F, C, T, R, S> Future for ApplyConfigNewServiceFut<F, C, T, R, S>
where
C: Clone,
F: FnMut(&C, &mut T::Service) -> R,
T: NewService<Config = ()>,
T::InitError: From<T::Error>,
R: IntoFuture<Error = T::InitError>,
R::Item: IntoService<S>,
S: Service,
{
type Item = S;
type Error = R::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.srv_fut {
match fut.poll()? {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(srv) => {
let _ = self.srv_fut.take();
self.srv = Some(srv);
return self.poll();
}
}
}
if let Some(ref mut fut) = self.fut {
Ok(Async::Ready(try_ready!(fut.poll()).into_service()))
} else if let Some(ref mut srv) = self.srv {
match srv.poll_ready()? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready(_) => {
self.fut = Some(self.f.get_mut()(&self.cfg, srv).into_future());
return self.poll();
}
}
} else {
Ok(Async::NotReady)
}
}
}

View File

@@ -4,7 +4,7 @@ use futures::{Async, Future, IntoFuture, Poll};
use crate::{NewService, Service};
pub type BoxedService<Req, Res, Err> = Box<
Service<
dyn Service<
Request = Req,
Response = Res,
Error = Err,
@@ -13,7 +13,7 @@ pub type BoxedService<Req, Res, Err> = Box<
>;
pub type BoxedServiceResponse<Res, Err> =
Either<FutureResult<Res, Err>, Box<Future<Item = Res, Error = Err>>>;
Either<FutureResult<Res, Err>, Box<dyn Future<Item = Res, Error = Err>>>;
pub struct BoxedNewService<C, Req, Res, Err, InitErr>(Inner<C, Req, Res, Err, InitErr>);
@@ -46,14 +46,14 @@ where
}
type Inner<C, Req, Res, Err, InitErr> = Box<
NewService<
dyn NewService<
Config = C,
Request = Req,
Response = Res,
Error = Err,
InitError = InitErr,
Service = BoxedService<Req, Res, Err>,
Future = Box<Future<Item = BoxedService<Req, Res, Err>, Error = InitErr>>,
Future = Box<dyn Future<Item = BoxedService<Req, Res, Err>, Error = InitErr>>,
>,
>;
@@ -70,7 +70,7 @@ where
type InitError = InitErr;
type Config = C;
type Service = BoxedService<Req, Res, Err>;
type Future = Box<Future<Item = Self::Service, Error = Self::InitError>>;
type Future = Box<dyn Future<Item = Self::Service, Error = Self::InitError>>;
fn new_service(&self, cfg: &C) -> Self::Future {
self.0.new_service(cfg)
@@ -99,7 +99,7 @@ where
type InitError = InitErr;
type Config = C;
type Service = BoxedService<Req, Res, Err>;
type Future = Box<Future<Item = Self::Service, Error = Self::InitError>>;
type Future = Box<dyn Future<Item = Self::Service, Error = Self::InitError>>;
fn new_service(&self, cfg: &C) -> Self::Future {
Box::new(
@@ -133,7 +133,7 @@ where
type Error = Err;
type Future = Either<
FutureResult<Self::Response, Self::Error>,
Box<Future<Item = Self::Response, Error = Self::Error>>,
Box<dyn Future<Item = Self::Response, Error = Self::Error>>,
>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {

View File

@@ -26,10 +26,15 @@ impl<T> Cell<T> {
}
}
pub(crate) fn get_ref(&self) -> &T {
unsafe { &*self.inner.as_ref().get() }
}
pub(crate) fn get_mut(&mut self) -> &mut T {
unsafe { &mut *self.inner.as_ref().get() }
}
#[allow(clippy::mut_from_ref)]
pub(crate) unsafe fn get_mut_unsafe(&self) -> &mut T {
&mut *self.inner.as_ref().get()
}

View File

@@ -25,7 +25,7 @@ mod transform_err;
pub use self::and_then::{AndThen, AndThenNewService};
pub use self::apply::{apply_fn, new_apply_fn, Apply, ApplyNewService};
pub use self::apply_cfg::apply_cfg;
pub use self::apply_cfg::{apply_cfg, new_apply_cfg};
pub use self::fn_service::{new_service_cfg, new_service_fn, service_fn, ServiceFn};
pub use self::fn_transform::transform_fn;
pub use self::from_err::{FromErr, FromErrNewService};
@@ -176,7 +176,7 @@ impl<T: ?Sized> ServiceExt for T where T: Service {}
/// Creates new `Service` values.
///
/// Acts as a service factory. This is useful for cases where new `Service`
/// values must be produced. One case is a TCP servier listener. The listner
/// values must be produced. One case is a TCP server listener. The listener
/// accepts new TCP streams, obtains a new `Service` value using the
/// `NewService` trait, and uses that new `Service` value to process inbound
/// requests on that new TCP stream.

View File

@@ -6,17 +6,11 @@ use futures::{Async, Future, IntoFuture, Poll};
use crate::transform_err::{TransformFromErr, TransformMapInitErr};
use crate::{IntoNewService, NewService, Service};
/// The `Transform` trait defines the interface of a Service factory. `Transform`
/// is often implemented for middleware, defining how to manufacture a
/// middleware Service. A Service that is manufactured by the factory takes
/// The `Transform` trait defines the interface of a Service factory. `Transform`
/// is often implemented for middleware, defining how to construct a
/// middleware Service. A Service that is constructed by the factory takes
/// the Service that follows it during execution as a parameter, assuming
/// ownership of the next Service. A Service can be a variety of types, such
/// as (but not limited to) another middleware Service, an extractor Service,
/// other helper Services, or the request handler endpoint Service.
///
/// A Service is created by the factory during server initialization.
///
/// `Config` is a service factory configuration type.
/// ownership of the next Service.
pub trait Transform<S> {
/// Requests handled by the service.
type Request;

View File

@@ -0,0 +1 @@
../LICENSE-APACHE

View File

@@ -0,0 +1 @@
../LICENSE-MIT

View File

@@ -1,5 +1,17 @@
# Changes
## [0.1.2] - 2019-08-05
### Changed
* Update `derive_more` to 0.15
* Update `parking_lot` to 0.9
## [0.1.1] - 2019-06-05
* Update parking_lot
## [0.1.0] - 2019-03-28
* Move threadpool to separate crate

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-threadpool"
version = "0.1.0"
version = "0.1.2"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix thread pool for sync code"
keywords = ["actix", "network", "framework", "async", "futures"]
@@ -18,9 +18,9 @@ name = "actix_threadpool"
path = "src/lib.rs"
[dependencies]
derive_more = "0.14"
derive_more = "0.15"
futures = "0.1.25"
parking_lot = "0.7"
parking_lot = "0.9"
lazy_static = "1.2"
log = "0.4"
num_cpus = "1.10"

View File

@@ -0,0 +1 @@
../LICENSE-APACHE

View File

@@ -0,0 +1 @@
../LICENSE-MIT

1
actix-tower/LICENSE-APACHE Symbolic link
View File

@@ -0,0 +1 @@
../LICENSE-APACHE

1
actix-tower/LICENSE-MIT Symbolic link
View File

@@ -0,0 +1 @@
../LICENSE-MIT

View File

@@ -1,4 +1,60 @@
//! Utilities to ease interoperability with services based on the `tower-service` crate.
//! Utilities to provide interoperability between services based on the
//! `actix-service` and `tower-service` crates.
//!
//! ## Example
//!
//! In the following example, we take a `RandomService`—which will always
//! return 4—and wraps it with a middleware that will always add 1 to the
//! result. This pattern can be further used to wrap services from either
//! `tower-service` or `actix-service` with middleware provided by the other.
//!
//! ```
//! use actix_tower::ActixServiceExt;
//! # use futures::{Async, Future};
//! use actix_service::Service;
//!
//! struct RandomService;
//! impl Service for RandomService {
//! // …
//! # type Request = ();
//! # type Response = u32;
//! # type Error = ();
//! # type Future = futures::future::FutureResult<Self::Response, Self::Error>;
//! #
//! # fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> {
//! # Ok(Async::Ready(()))
//! # }
//! #
//! # fn call(&mut self, _req: Self::Request) -> Self::Future {
//! # futures::finished(4)
//! # }
//! }
//!
//! struct AddOneMiddleware<S>(S);
//! impl<S, R> tower_service::Service<R> for AddOneMiddleware<S>
//! where
//! S: tower_service::Service<R, Response = u32>,
//! S::Future: 'static,
//! {
//! /// …
//! # type Response = u32;
//! # type Error = S::Error;
//! # type Future = Box<dyn Future<Item = Self::Response, Error = Self::Error>>;
//! #
//! # fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> {
//! # self.0.poll_ready()
//! # }
//! #
//! # fn call(&mut self, req: R) -> Self::Future {
//! # let fut = self.0.call(req).map(|x| x + 1);
//! # Box::new(fut)
//! # }
//! }
//!
//! let mut s = RandomService.wrap_with_tower_middleware(AddOneMiddleware);
//! assert_eq!(Ok(Async::Ready(())), s.poll_ready());
//! assert_eq!(Ok(Async::Ready(5)), s.call(()).poll());
//! ```
use actix_service::Service as ActixService;
use std::marker::PhantomData;
@@ -6,40 +62,120 @@ use tower_service::Service as TowerService;
/// Compatibility wrapper associating a `tower_service::Service` with a particular
/// `Request` type, so that it can be used as an `actix_service::Service`.
pub struct TowerCompat<S, R> {
///
/// Generally created through convenience methods on the `TowerServiceExt<R>` trait.
pub struct ActixCompat<S, R> {
inner: S,
_phantom: PhantomData<R>,
}
impl<S, R> TowerCompat<S, R> {
impl<S, R> ActixCompat<S, R> {
/// Wraps a `tower_service::Service` in a compatibility wrapper.
pub fn new(inner: S) -> Self {
TowerCompat {
ActixCompat {
inner,
_phantom: PhantomData,
}
}
}
/// Extension trait for wrapping `tower_service::Service` instances for use as
/// Extension trait for wrapping a `tower_service::Service` instance for use as
/// an `actix_service::Service`.
pub trait TowerServiceExt {
pub trait TowerServiceExt<R>: TowerService<R> + Sized {
/// Wraps a `tower_service::Service` in a compatibility wrapper.
fn compat<R>(self) -> TowerCompat<Self, R>
where
Self: TowerService<R> + Sized;
}
///
/// ```
/// use actix_service::Service;
/// use actix_tower::TowerServiceExt;
/// # use futures::{Async, Future};
///
/// struct RandomService;
/// impl<R> tower_service::Service<R> for RandomService {
/// // …
/// # type Response = u32;
/// # type Error = ();
/// # type Future = futures::future::FutureResult<Self::Response, Self::Error>;
/// #
/// # fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> {
/// # Ok(Async::Ready(()))
/// # }
/// #
/// # fn call(&mut self, _req: R) -> Self::Future {
/// # futures::finished(4)
/// # }
/// }
///
/// let mut s = RandomService.into_actix_service();
/// assert_eq!(Ok(Async::Ready(())), s.poll_ready());
/// assert_eq!(Ok(Async::Ready(4)), s.call(()).poll());
/// ```
fn into_actix_service(self) -> ActixCompat<Self, R> {
ActixCompat::new(self)
}
impl<S> TowerServiceExt for S {
fn compat<R>(self) -> TowerCompat<Self, R>
/// Takes a function that, when provided with an `actix_service::Service` wraps it
/// and returns a new service. Useful for wrapping a `tower_service::Service` with
/// middleware built for `actix_service`.
///
/// ```
/// use actix_tower::TowerServiceExt;
/// # use futures::{Async, Future};
/// use tower_service::Service;
///
/// struct RandomService;
/// impl<R> Service<R> for RandomService {
/// // …
/// # type Response = u32;
/// # type Error = ();
/// # type Future = futures::future::FutureResult<Self::Response, Self::Error>;
/// #
/// # fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> {
/// # Ok(Async::Ready(()))
/// # }
/// #
/// # fn call(&mut self, _req: R) -> Self::Future {
/// # futures::finished(4)
/// # }
/// }
///
/// struct AddOneTransform<S>(S);
/// impl<S> actix_service::Service for AddOneTransform<S>
/// where
/// S: actix_service::Service<Response = u32>,
/// S::Future: 'static,
/// {
/// /// …
/// # type Request = S::Request;
/// # type Response = u32;
/// # type Error = S::Error;
/// # type Future = Box<dyn Future<Item = Self::Response, Error = Self::Error>>;
/// #
/// # fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> {
/// # self.0.poll_ready()
/// # }
/// #
/// # fn call(&mut self, req: Self::Request) -> Self::Future {
/// # let fut = self.0.call(req).map(|x| x + 1);
/// # Box::new(fut)
/// # }
/// }
///
/// let mut s = RandomService.wrap_with_actix_middleware(AddOneTransform);
/// assert_eq!(Ok(Async::Ready(())), s.poll_ready());
/// assert_eq!(Ok(Async::Ready(5)), s.call(()).poll());
/// ```
fn wrap_with_actix_middleware<F, U>(self, f: F) -> TowerCompat<U>
where
Self: TowerService<R> + Sized
F: FnOnce(ActixCompat<Self, R>) -> U,
U: ActixService<Request = R>,
{
TowerCompat::new(self)
f(self.into_actix_service()).into_tower_service()
}
}
impl<S, R> ActixService for TowerCompat<S, R>
impl<S, R> TowerServiceExt<R> for S where S: TowerService<R> + Sized {}
impl<S, R> ActixService for ActixCompat<S, R>
where
S: TowerService<R>,
{
@@ -57,135 +193,388 @@ where
}
}
/// Compatibility wrapper associating an `actix_service::Service` with a particular
/// `Request` type, so that it can be used as a `tower_service::Service`.
///
/// Generally created through convenience methods on the `ActixServiceExt<R>` trait.
pub struct TowerCompat<S> {
inner: S,
}
impl<S> TowerCompat<S> {
/// Wraps an `actix_service::Service` in a compatibility wrapper.
pub fn new(inner: S) -> Self {
TowerCompat { inner }
}
}
/// Extension trait for wrapping an `actix_service::Service` instance for use as
/// a `tower_service::Service`.
pub trait ActixServiceExt: ActixService + Sized {
/// Wraps a `tower_service::Service` in a compatibility wrapper.
///
/// ```
/// use actix_tower::ActixServiceExt;
/// # use futures::{Async, Future};
/// use tower_service::Service;
///
/// struct RandomService;
/// impl actix_service::Service for RandomService {
/// // …
/// # type Request = ();
/// # type Response = u32;
/// # type Error = ();
/// # type Future = futures::future::FutureResult<Self::Response, Self::Error>;
/// #
/// # fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> {
/// # Ok(Async::Ready(()))
/// # }
/// #
/// # fn call(&mut self, _req: Self::Request) -> Self::Future {
/// # futures::finished(4)
/// # }
/// }
///
/// let mut s = RandomService.into_tower_service();
/// assert_eq!(Ok(Async::Ready(())), s.poll_ready());
/// assert_eq!(Ok(Async::Ready(4)), s.call(()).poll());
/// ```
fn into_tower_service(self) -> TowerCompat<Self> {
TowerCompat::new(self)
}
/// Takes a function that, when provided with a `tower_service::Service` wraps it
/// and returns a new service. Useful for wrapping an `actix_service::Service` with
/// middleware built for `tower_service`.
///
/// ```
/// use actix_tower::ActixServiceExt;
/// # use futures::{Async, Future};
/// use actix_service::Service;
///
/// struct RandomService;
/// impl Service for RandomService {
/// // …
/// # type Request = ();
/// # type Response = u32;
/// # type Error = ();
/// # type Future = futures::future::FutureResult<Self::Response, Self::Error>;
/// #
/// # fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> {
/// # Ok(Async::Ready(()))
/// # }
/// #
/// # fn call(&mut self, _req: Self::Request) -> Self::Future {
/// # futures::finished(4)
/// # }
/// }
///
/// struct AddOneMiddleware<S>(S);
/// impl<S, R> tower_service::Service<R> for AddOneMiddleware<S>
/// where
/// S: tower_service::Service<R, Response = u32>,
/// S::Future: 'static,
/// {
/// /// …
/// # type Response = u32;
/// # type Error = S::Error;
/// # type Future = Box<dyn Future<Item = Self::Response, Error = Self::Error>>;
/// #
/// # fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> {
/// # self.0.poll_ready()
/// # }
/// #
/// # fn call(&mut self, req: R) -> Self::Future {
/// # let fut = self.0.call(req).map(|x| x + 1);
/// # Box::new(fut)
/// # }
/// }
///
/// let mut s = RandomService.wrap_with_tower_middleware(AddOneMiddleware);
/// assert_eq!(Ok(Async::Ready(())), s.poll_ready());
/// assert_eq!(Ok(Async::Ready(5)), s.call(()).poll());
/// ```
fn wrap_with_tower_middleware<F, U>(self, f: F) -> ActixCompat<U, Self::Request>
where
F: FnOnce(TowerCompat<Self>) -> U,
U: TowerService<Self::Request>,
{
f(self.into_tower_service()).into_actix_service()
}
}
impl<S> ActixServiceExt for S where S: ActixService + Sized {}
impl<S> TowerService<S::Request> for TowerCompat<S>
where
S: ActixService,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> {
ActixService::poll_ready(&mut self.inner)
}
fn call(&mut self, req: S::Request) -> Self::Future {
ActixService::call(&mut self.inner, req)
}
}
#[cfg(test)]
mod tests {
use super::TowerServiceExt;
use actix_service::{Service as ActixService, ServiceExt, Transform};
use futures::{future::FutureResult, Async, Poll, Future};
use tower_service::Service as TowerService;
mod tower_service_into_actix_service {
use crate::TowerServiceExt;
use actix_service::{Service as ActixService, ServiceExt, Transform};
use futures::{future::FutureResult, Async, Future, Poll};
use tower_service::Service as TowerService;
struct RandomService;
impl<R> TowerService<R> for RandomService {
type Response = u32;
type Error = ();
type Future = FutureResult<Self::Response, Self::Error>;
#[test]
fn random_service_returns_4() {
let mut s = RandomService.into_actix_service();
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
assert_eq!(Ok(Async::Ready(())), s.poll_ready());
assert_eq!(Ok(Async::Ready(4)), s.call(()).poll());
}
fn call(&mut self, _req: R) -> Self::Future {
futures::finished(4)
#[test]
fn random_service_can_combine() {
let mut s = RandomService.into_actix_service().map(|x| x + 1);
assert_eq!(Ok(Async::Ready(())), s.poll_ready());
assert_eq!(Ok(Async::Ready(5)), s.call(()).poll());
}
#[test]
fn random_service_can_use_actix_middleware() {
let mut s = RandomService.wrap_with_actix_middleware(DoMathTransform);
assert_eq!(Ok(Async::Ready(())), s.poll_ready());
assert_eq!(Ok(Async::Ready(68)), s.call(()).poll());
}
#[test]
fn random_service_and_add_service_chained() {
let s1 = RandomService.into_actix_service();
let s2 = AddOneService.into_actix_service();
let s3 = AddOneService.into_actix_service();
let mut s = s1.and_then(s2).and_then(s3);
assert_eq!(Ok(Async::Ready(())), s.poll_ready());
assert_eq!(Ok(Async::Ready(6)), s.call(()).poll());
}
#[test]
fn random_service_and_add_service_and_ignoring_service_chained() {
let s1 = RandomService.into_actix_service();
let s2 = AddOneService.into_actix_service();
let s3 = AddOneService.into_actix_service();
let s4 = RandomService.into_actix_service();
let mut s = s1.and_then(s2).and_then(s3).and_then(s4);
assert_eq!(Ok(Async::Ready(())), s.poll_ready());
assert_eq!(Ok(Async::Ready(4)), s.call(()).poll());
}
#[test]
fn random_service_can_be_transformed_to_do_math() {
let transform = DoMath;
let mut s = transform
.new_transform(RandomService.into_actix_service())
.wait()
.unwrap();
assert_eq!(Ok(Async::Ready(())), s.poll_ready());
assert_eq!(Ok(Async::Ready(68)), s.call(()).poll());
}
struct RandomService;
impl<R> TowerService<R> for RandomService {
type Response = u32;
type Error = ();
type Future = FutureResult<Self::Response, Self::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, _req: R) -> Self::Future {
futures::finished(4)
}
}
struct AddOneService;
impl TowerService<u32> for AddOneService {
type Response = u32;
type Error = ();
type Future = FutureResult<Self::Response, Self::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: u32) -> Self::Future {
futures::finished(req + 1)
}
}
struct DoMathTransform<S>(S);
impl<S> ActixService for DoMathTransform<S>
where
S: ActixService<Response = u32>,
S::Future: 'static,
{
type Request = S::Request;
type Response = u32;
type Error = S::Error;
type Future = Box<dyn Future<Item = Self::Response, Error = Self::Error>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.0.poll_ready()
}
fn call(&mut self, req: Self::Request) -> Self::Future {
let fut = self.0.call(req).map(|x| x * 17);
Box::new(fut)
}
}
struct DoMath;
impl<S> Transform<S> for DoMath
where
S: ActixService<Response = u32>,
S::Future: 'static,
{
type Request = S::Request;
type Response = u32;
type Error = S::Error;
type Transform = DoMathTransform<S>;
type InitError = ();
type Future = FutureResult<Self::Transform, Self::InitError>;
fn new_transform(&self, service: S) -> Self::Future {
futures::finished(DoMathTransform(service))
}
}
}
#[test]
fn tower_service_as_actix_service_returns_4() {
let mut s = RandomService.compat();
mod actix_service_into_tower_service {
use crate::{ActixServiceExt, TowerServiceExt};
use actix_service::{Service as ActixService, ServiceExt};
use futures::{future::FutureResult, Async, Future, Poll};
use tower_service::Service as TowerService;
assert_eq!(Ok(Async::Ready(())), s.poll_ready());
#[test]
fn random_service_returns_4() {
let mut s = RandomService.into_tower_service();
assert_eq!(Ok(Async::Ready(4)), s.call(()).poll());
}
assert_eq!(Ok(Async::Ready(())), s.poll_ready());
#[test]
fn tower_service_as_actix_service_can_combine() {
let mut s = RandomService.compat().map(|x| x + 1);
assert_eq!(Ok(Async::Ready(())), s.poll_ready());
assert_eq!(Ok(Async::Ready(5)), s.call(()).poll());
}
struct AddOneService;
impl TowerService<u32> for AddOneService {
type Response = u32;
type Error = ();
type Future = FutureResult<Self::Response, Self::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
assert_eq!(Ok(Async::Ready(4)), s.call(()).poll());
}
fn call(&mut self, req: u32) -> Self::Future {
futures::finished(req + 1)
}
}
#[test]
fn random_service_can_use_tower_middleware() {
let mut s =
AddOneService::wrap(RandomService.into_tower_service()).into_actix_service();
#[test]
fn tower_services_as_actix_services_chained() {
let s1 = RandomService.compat();
let s2 = AddOneService.compat();
let s3 = AddOneService.compat();
assert_eq!(Ok(Async::Ready(())), s.poll_ready());
let mut s = s1.and_then(s2).and_then(s3);
assert_eq!(Ok(Async::Ready(())), s.poll_ready());
assert_eq!(Ok(Async::Ready(6)), s.call(()).poll());
}
#[test]
fn tower_services_as_actix_services_chained_2() {
let s1 = RandomService.compat();
let s2 = AddOneService.compat();
let s3 = AddOneService.compat();
let s4 = RandomService.compat();
let mut s = s1.and_then(s2).and_then(s3).and_then(s4);
assert_eq!(Ok(Async::Ready(())), s.poll_ready());
assert_eq!(Ok(Async::Ready(4)), s.call(()).poll());
}
struct DoMathTransform<S>(S);
impl<S> ActixService for DoMathTransform<S>
where
S: ActixService<Response = u32>,
S::Future: 'static,
{
type Request = S::Request;
type Response = u32;
type Error = S::Error;
type Future = Box<dyn Future<Item = Self::Response, Error = Self::Error>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.0.poll_ready()
assert_eq!(Ok(Async::Ready(5)), s.call(()).poll());
}
fn call(&mut self, req: Self::Request) -> Self::Future {
let fut = self.0.call(req).map(|x| x * 17);
Box::new(fut)
#[test]
fn do_math_service_can_use_tower_middleware() {
let mut s =
AddOneService::wrap(DoMathService.into_tower_service()).into_actix_service();
assert_eq!(Ok(Async::Ready(())), s.poll_ready());
assert_eq!(Ok(Async::Ready(188)), s.call(11).poll());
}
#[test]
fn random_service_and_add_service_and_ignoring_service_chained() {
let s1 = RandomService.wrap_with_tower_middleware(AddOneService::wrap);
let s2 = DoMathService.wrap_with_tower_middleware(AddOneService::wrap);
let mut s = s1.and_then(s2);
assert_eq!(Ok(Async::Ready(())), s.poll_ready());
assert_eq!(Ok(Async::Ready(86)), s.call(()).poll());
}
struct RandomService;
impl ActixService for RandomService {
type Request = ();
type Response = u32;
type Error = ();
type Future = FutureResult<Self::Response, Self::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, _req: Self::Request) -> Self::Future {
futures::finished(4)
}
}
struct AddOneService<S> {
inner: S,
}
impl<S> AddOneService<S> {
fn wrap(inner: S) -> Self {
AddOneService { inner }
}
}
impl<S, R> TowerService<R> for AddOneService<S>
where
S: TowerService<R, Response = u32>,
S::Future: 'static,
{
type Response = u32;
type Error = S::Error;
type Future = Box<dyn Future<Item = Self::Response, Error = Self::Error>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.inner.poll_ready()
}
fn call(&mut self, req: R) -> Self::Future {
let fut = self.inner.call(req).map(|x| x + 1);
Box::new(fut)
}
}
struct DoMathService;
impl ActixService for DoMathService {
type Request = u32;
type Response = u32;
type Error = ();
type Future = FutureResult<Self::Response, Self::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: Self::Request) -> Self::Future {
futures::finished(req * 17)
}
}
}
struct DoMath;
impl<S> Transform<S> for DoMath
where
S: ActixService<Response = u32>,
S::Future: 'static,
{
type Request = S::Request;
type Response = u32;
type Error = S::Error;
type Transform = DoMathTransform<S>;
type InitError = ();
type Future = FutureResult<Self::Transform, Self::InitError>;
fn new_transform(&self, service: S) -> Self::Future {
futures::finished(DoMathTransform(service))
}
}
#[test]
fn tower_service_as_actix_service_can_be_transformed() {
let transform = DoMath;
let mut s = transform.new_transform(RandomService.compat()).wait().unwrap();
assert_eq!(Ok(Async::Ready(())), s.poll_ready());
assert_eq!(Ok(Async::Ready(68)), s.call(()).poll());
}
}
}

View File

@@ -1,5 +1,42 @@
# Changes
## [0.4.5] - 2019-07-19
### Removed
* Deprecated `CloneableService` as it is not safe
## [0.4.4] - 2019-07-17
### Changed
* Undeprecate `FramedTransport` as it is actually useful
## [0.4.3] - 2019-07-17
### Deprecated
* Deprecate `CloneableService` as it is not safe and in general not very useful
* Deprecate `FramedTransport` in favor of `actix-ioframe`
## [0.4.2] - 2019-06-26
### Fixed
* Do not block on sink drop for FramedTransport
## [0.4.1] - 2019-05-15
### Changed
* Change `Either` constructor
## [0.4.0] - 2019-05-11
### Changed

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-utils"
version = "0.4.0"
version = "0.4.5"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix utils - various actix net related services"
keywords = ["network", "framework", "async", "futures"]
@@ -18,8 +18,8 @@ name = "actix_utils"
path = "src/lib.rs"
[dependencies]
actix-service = "0.4.0"
actix-codec = "0.1.1"
actix-service = "0.4.1"
actix-codec = "0.1.2"
bytes = "0.4"
either = "1.5.2"
futures = "0.1.25"

1
actix-utils/LICENSE-APACHE Symbolic link
View File

@@ -0,0 +1 @@
../LICENSE-APACHE

1
actix-utils/LICENSE-MIT Symbolic link
View File

@@ -0,0 +1 @@
../LICENSE-MIT

View File

@@ -1,52 +0,0 @@
use std::marker::PhantomData;
use std::rc::Rc;
use actix_service::Service;
use futures::Poll;
use super::cell::Cell;
/// Service that allows to turn non-clone service to a service with `Clone` impl
pub struct CloneableService<T> {
service: Cell<T>,
_t: PhantomData<Rc<()>>,
}
impl<T> CloneableService<T> {
pub fn new(service: T) -> Self
where
T: Service,
{
Self {
service: Cell::new(service),
_t: PhantomData,
}
}
}
impl<T> Clone for CloneableService<T> {
fn clone(&self) -> Self {
Self {
service: self.service.clone(),
_t: PhantomData,
}
}
}
impl<T> Service for CloneableService<T>
where
T: Service,
{
type Request = T::Request;
type Response = T::Response;
type Error = T::Error;
type Future = T::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.get_mut().poll_ready()
}
fn call(&mut self, req: T::Request) -> Self::Future {
self.service.get_mut().call(req)
}
}

View File

@@ -57,25 +57,21 @@ pub struct Either<A, B> {
}
impl<A, B> Either<A, B> {
pub fn new_a<F>(srv: F) -> Either<A, ()>
pub fn new<F1, F2>(srv_a: F1, srv_b: F2) -> Either<A, B>
where
A: NewService,
F: IntoNewService<A>,
B: NewService<
Config = A::Config,
Response = A::Response,
Error = A::Error,
InitError = A::InitError,
>,
F1: IntoNewService<A>,
F2: IntoNewService<B>,
{
Either {
left: srv.into_new_service(),
right: (),
}
}
pub fn new_b<F>(srv: F) -> Either<(), B>
where
B: NewService,
F: IntoNewService<B>,
{
Either {
left: (),
right: srv.into_new_service(),
left: srv_a.into_new_service(),
right: srv_b.into_new_service(),
}
}
}

View File

@@ -187,10 +187,12 @@ where
return true;
}
Ok(Async::Ready(None)) => {
rx_done = true;
let _ = self.rx.take();
}
Ok(Async::NotReady) => rx_done = true,
Err(_e) => {
rx_done = true;
let _ = self.rx.take();
}
}

View File

@@ -119,7 +119,7 @@ mod tests {
type Request = ();
type Response = ();
type Error = ();
type Future = Box<Future<Item = (), Error = ()>>;
type Future = Box<dyn Future<Item = (), Error = ()>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))

View File

@@ -1,6 +1,6 @@
//! Actix utils - various helper services
mod cell;
pub mod cloneable;
pub mod counter;
pub mod either;
pub mod framed;

View File

@@ -214,7 +214,7 @@ mod tests {
type Request = oneshot::Receiver<usize>;
type Response = usize;
type Error = ();
type Future = Box<Future<Item = usize, Error = ()>>;
type Future = Box<dyn Future<Item = usize, Error = ()>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))

View File

@@ -45,7 +45,7 @@ where
type Request = S;
type Response = ();
type Error = E;
type Future = Box<Future<Item = (), Error = E>>;
type Future = Box<dyn Future<Item = (), Error = E>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))

View File

@@ -190,7 +190,7 @@ mod tests {
type Request = ();
type Response = ();
type Error = ();
type Future = Box<Future<Item = (), Error = ()>>;
type Future = Box<dyn Future<Item = (), Error = ()>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))

View File

@@ -1,5 +1,13 @@
# Changes
## [0.1.5] - 2019-05-15
* Remove debug prints
## [0.1.4] - 2019-05-15
* Fix checked resource match
## [0.1.3] - 2019-04-22
* Added support for `remainder match` (i.e "/path/{tail}*")

View File

@@ -1,8 +1,8 @@
[package]
name = "actix-router"
version = "0.1.3"
version = "0.1.5"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Path table router"
description = "Path router"
keywords = ["actix"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"

1
router/LICENSE-APACHE Symbolic link
View File

@@ -0,0 +1 @@
../LICENSE-APACHE

1
router/LICENSE-MIT Symbolic link
View File

@@ -0,0 +1 @@
../LICENSE-MIT

View File

@@ -151,7 +151,7 @@ impl<'de, T: ResourcePath + 'de> Deserializer<'de> for PathDeserializer<'de, T>
where
V: Visitor<'de>,
{
if self.path.len() < 1 {
if self.path.is_empty() {
Err(de::value::Error::custom(
"expeceted at least one parameters",
))

View File

@@ -93,7 +93,7 @@ impl<T: ResourcePath> Path<T> {
#[inline]
/// Skip first `n` chars in path
pub fn skip(&mut self, n: u16) {
self.skip = self.skip + n;
self.skip += n;
}
pub(crate) fn add(&mut self, name: Rc<String>, value: PathItem) {

View File

@@ -5,7 +5,7 @@ use std::rc::Rc;
use regex::{escape, Regex};
use crate::path::{Path, PathItem};
use crate::ResourcePath;
use crate::{Resource, ResourcePath};
const MAX_DYNAMIC_SEGMENTS: usize = 16;
@@ -207,7 +207,7 @@ impl ResourceDef {
"Dynamic path match but not all segments found: {}",
name
);
false;
return false;
}
}
} else {
@@ -240,6 +240,90 @@ impl ResourceDef {
}
}
/// Is the given path and parameters a match against this pattern?
pub fn match_path_checked<R, T, F, U>(
&self,
res: &mut R,
check: &F,
user_data: &Option<U>,
) -> bool
where
T: ResourcePath,
R: Resource<T>,
F: Fn(&R, &Option<U>) -> bool,
{
match self.tp {
PatternType::Static(ref s) => {
if s == res.resource_path().path() && check(res, user_data) {
let path = res.resource_path();
path.skip(path.len() as u16);
true
} else {
false
}
}
PatternType::Dynamic(ref re, ref names, len) => {
let mut idx = 0;
let mut pos = 0;
let mut segments: [PathItem; MAX_DYNAMIC_SEGMENTS] =
[PathItem::Static(""); MAX_DYNAMIC_SEGMENTS];
if let Some(captures) = re.captures(res.resource_path().path()) {
for (no, name) in names.iter().enumerate() {
if let Some(m) = captures.name(&name) {
idx += 1;
pos = m.end();
segments[no] = PathItem::Segment(m.start() as u16, m.end() as u16);
} else {
log::error!(
"Dynamic path match but not all segments found: {}",
name
);
return false;
}
}
} else {
return false;
}
if !check(res, user_data) {
return false;
}
let path = res.resource_path();
for idx in 0..idx {
path.add(names[idx].clone(), segments[idx]);
}
path.skip((pos + len) as u16);
true
}
PatternType::Prefix(ref s) => {
let len = {
let rpath = res.resource_path().path();
if s == rpath {
s.len()
} else if rpath.starts_with(s)
&& (s.ends_with('/') || rpath.split_at(s.len()).1.starts_with('/'))
{
if s.ends_with('/') {
s.len() - 1
} else {
s.len()
}
} else {
return false;
}
};
if !check(res, user_data) {
return false;
}
let path = res.resource_path();
path.skip(min(path.path().len(), len) as u16);
true
}
}
}
/// Build resource path from elements. Returns `true` on success.
pub fn resource_path<U, I>(&self, path: &mut String, elements: &mut U) -> bool
where

View File

@@ -19,26 +19,26 @@ impl<T, U> Router<T, U> {
}
}
pub fn recognize<R, P>(&self, path: &mut R) -> Option<(&T, ResourceId)>
pub fn recognize<R, P>(&self, resource: &mut R) -> Option<(&T, ResourceId)>
where
R: Resource<P>,
P: ResourcePath,
{
for item in self.0.iter() {
if item.0.match_path(path.resource_path()) {
if item.0.match_path(resource.resource_path()) {
return Some((&item.1, ResourceId(item.0.id())));
}
}
None
}
pub fn recognize_mut<R, P>(&mut self, res: &mut R) -> Option<(&mut T, ResourceId)>
pub fn recognize_mut<R, P>(&mut self, resource: &mut R) -> Option<(&mut T, ResourceId)>
where
R: Resource<P>,
P: ResourcePath,
{
for item in self.0.iter_mut() {
if item.0.match_path(res.resource_path()) {
if item.0.match_path(resource.resource_path()) {
return Some((&mut item.1, ResourceId(item.0.id())));
}
}
@@ -47,7 +47,7 @@ impl<T, U> Router<T, U> {
pub fn recognize_mut_checked<R, P, F>(
&mut self,
res: &mut R,
resource: &mut R,
check: F,
) -> Option<(&mut T, ResourceId)>
where
@@ -56,7 +56,7 @@ impl<T, U> Router<T, U> {
P: ResourcePath,
{
for item in self.0.iter_mut() {
if item.0.match_path(res.resource_path()) && check(res, &item.2) {
if item.0.match_path_checked(resource, &check, &item.2) {
return Some((&mut item.1, ResourceId(item.0.id())));
}
}