1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-08-15 06:45:56 +02:00

Compare commits

...

22 Commits

Author SHA1 Message Date
Nikolay Kim
34a7b7f05a add TcpStreamService 2019-09-05 16:34:48 -07:00
Nikolay Kim
b1d9b06a87 Use arbiters storage for default async resolver 2019-09-02 15:15:55 -07:00
Nikolay Kim
94e673b50b Add arbiter specific storage 2019-09-02 15:03:03 -07:00
Nikolay Kim
1a644c6bb1 Check service readiness for new_apply_cfg combinator 2019-08-27 05:28:15 +06:00
Yuki Okushi
aad013f559 Fix clippy warnings (#40)
Add explicit `dyn`s

Remove let binding

Use +=

Return false

Derive Default for TcpConnector

Squash if/else

Remove unnecessary return keywords

Use is_empty()

Fix clippy attribute

Allow mut_from_ref
2019-08-17 05:15:51 +09:00
Aron Heinecke
7a18d9da26 Add check for minimum rust version (#39) 2019-08-07 17:49:29 -07:00
Aron Heinecke
d59b8ce62e Fix invalid minimum version (#38) 2019-08-07 17:48:31 -07:00
Nikolay Kim
3821d511d0 prep actix-threadpool release 2019-08-05 09:54:49 -07:00
Nikolay Kim
62e429cb0c Merge branch 'master' of github.com:actix/actix-net 2019-08-05 09:53:03 -07:00
Nikolay Kim
a2643d475a Add ConnectService and OpensslConnectService 2019-08-05 09:52:50 -07:00
Sven-Hendrik Haase
34c259a8b5 Merge pull request #35 from ignatenkobrain/parking_lot
threadpool: Update parking_lot to 0.9
2019-08-05 17:30:19 +02:00
Igor Gnatenko
8b398c3386 threadpool: Update parking_lot to 0.9
Signed-off-by: Igor Gnatenko <i.gnatenko.brain@gmail.com>
2019-08-04 15:46:14 +02:00
Neil Locketz
0baceb0e56 Fix typo in desc (#34) 2019-07-30 09:35:57 -07:00
Michael Snoyman
6be1f37f6c Minor typo corrections in docs (#33) 2019-07-25 11:46:11 +06:00
Nikolay Kim
a742768feb bump version 2019-07-24 14:16:25 +06:00
Marat Safin
f913872159 add rustls support for connect (#31) 2019-07-24 14:14:26 +06:00
Nikolay Kim
41145040e1 remove ClonableService 2019-07-19 11:03:16 +06:00
Nikolay Kim
311bb14d97 add unix domain sockets support #3 2019-07-18 17:05:40 +06:00
Nikolay Kim
2955e49d78 add unix domain sockets support 2019-07-18 16:43:42 +06:00
Nikolay Kim
9d1b428b34 undeprecate framed transport 2019-07-17 13:31:00 +06:00
Nikolay Kim
42d526bced mark some fn as unsafe 2019-07-17 11:16:38 +06:00
Nikolay Kim
23a230a83b deprecate ClonableService and FramedTransport 2019-07-17 10:57:52 +06:00
53 changed files with 1051 additions and 220 deletions

View File

@@ -10,9 +10,10 @@ matrix:
include: include:
- rust: stable - rust: stable
- rust: beta - rust: beta
- rust: nightly-2019-03-02 - rust: 1.36.0
- rust: nightly-2019-06-15
allow_failures: allow_failures:
- rust: nightly-2019-03-02 - rust: nightly-2019-06-15
env: env:
global: global:
@@ -25,8 +26,8 @@ before_install:
- sudo apt-get install -y openssl libssl-dev libelf-dev libdw-dev cmake gcc binutils-dev libiberty-dev - sudo apt-get install -y openssl libssl-dev libelf-dev libdw-dev cmake gcc binutils-dev libiberty-dev
before_cache: | before_cache: |
if [[ "$TRAVIS_RUST_VERSION" == "nightly-2019-03-02" ]]; then if [[ "$TRAVIS_RUST_VERSION" == "nightly-2019-06-15" ]]; then
RUSTFLAGS="--cfg procmacro2_semver_exempt" cargo install cargo-tarpaulin RUSTFLAGS="--cfg procmacro2_semver_exempt" cargo install --version 0.6.11 cargo-tarpaulin
fi fi
# Add clippy # Add clippy
@@ -35,14 +36,14 @@ before_script:
script: script:
- | - |
if [[ "$TRAVIS_RUST_VERSION" != "nightly-2019-03-02" ]]; then if [[ "$TRAVIS_RUST_VERSION" != "nightly-2019-06-15" ]]; then
cargo clean cargo clean
cargo test --all --all-features -- --nocapture cargo test --all --all-features -- --nocapture
fi fi
after_success: after_success:
- | - |
if [[ "$TRAVIS_RUST_VERSION" == "nightly-2019-03-02" ]]; then if [[ "$TRAVIS_RUST_VERSION" == "nightly-2019-06-15" ]]; then
taskset -c 0 cargo tarpaulin --all --all-features --out Xml taskset -c 0 cargo tarpaulin --all --all-features --out Xml
echo "Uploaded code coverage" echo "Uploaded code coverage"
bash <(curl -s https://codecov.io/bash) bash <(curl -s https://codecov.io/bash)

View File

@@ -2,7 +2,7 @@
name = "actix-net" name = "actix-net"
version = "0.3.0" version = "0.3.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix net - framework for the compisible network services for Rust" description = "Actix net - framework for the composable network services for Rust"
readme = "README.md" readme = "README.md"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs" homepage = "https://actix.rs"

View File

@@ -7,7 +7,7 @@ Actix net - framework for composable network services
* [API Documentation (Development)](https://actix.rs/actix-net/actix_net/) * [API Documentation (Development)](https://actix.rs/actix-net/actix_net/)
* [Chat on gitter](https://gitter.im/actix/actix) * [Chat on gitter](https://gitter.im/actix/actix)
* Cargo package: [actix-net](https://crates.io/crates/actix-net) * Cargo package: [actix-net](https://crates.io/crates/actix-net)
* Minimum supported Rust version: 1.32 or later * Minimum supported Rust version: 1.36 or later
## Example ## Example

View File

@@ -1,5 +1,21 @@
# Changes # Changes
## [0.2.5] - 2019-09-05
* Add `TcpConnectService`
## [0.2.4] - 2019-09-02
* Use arbiter's storage for default async resolver
## [0.2.3] - 2019-08-05
* Add `ConnectService` and `OpensslConnectService`
## [0.2.2] - 2019-07-24
* Add `rustls` support
## [0.2.1] - 2019-07-17 ## [0.2.1] - 2019-07-17
### Added ### Added

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "actix-connect" name = "actix-connect"
version = "0.2.1" version = "0.2.5"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix Connector - tcp connector service" description = "Actix Connector - tcp connector service"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@@ -26,6 +26,9 @@ default = ["uri"]
# openssl # openssl
ssl = ["openssl", "tokio-openssl"] ssl = ["openssl", "tokio-openssl"]
#rustls
rust-tls = ["rustls", "tokio-rustls", "webpki"]
# support http::Uri as connect address # support http::Uri as connect address
uri = ["http"] uri = ["http"]
@@ -33,6 +36,7 @@ uri = ["http"]
actix-service = "0.4.0" actix-service = "0.4.0"
actix-codec = "0.1.2" actix-codec = "0.1.2"
actix-utils = "0.4.0" actix-utils = "0.4.0"
actix-rt = "0.2.5"
derive_more = "0.15" derive_more = "0.15"
either = "1.5.2" either = "1.5.2"
futures = "0.1.25" futures = "0.1.25"
@@ -46,6 +50,11 @@ trust-dns-resolver = { version="0.11.0", default-features = false }
openssl = { version="0.10", optional = true } openssl = { version="0.10", optional = true }
tokio-openssl = { version="0.3", optional = true } tokio-openssl = { version="0.3", optional = true }
#rustls
rustls = { version = "0.15.2", optional = true }
tokio-rustls = { version = "0.9.1", optional = true }
webpki = { version = "0.19", optional = true }
[dev-dependencies] [dev-dependencies]
bytes = "0.4" bytes = "0.4"
actix-test-server = { version="0.2.2", features=["ssl"] } actix-test-server = { version="0.2.2", features=["ssl"] }

View File

@@ -18,6 +18,17 @@ impl<T> TcpConnectorFactory<T> {
pub fn new() -> Self { pub fn new() -> Self {
TcpConnectorFactory(PhantomData) TcpConnectorFactory(PhantomData)
} }
/// Create tcp connector service
pub fn service(&self) -> TcpConnector<T> {
TcpConnector(PhantomData)
}
}
impl<T> Default for TcpConnectorFactory<T> {
fn default() -> Self {
TcpConnectorFactory(PhantomData)
}
} }
impl<T> Clone for TcpConnectorFactory<T> { impl<T> Clone for TcpConnectorFactory<T> {
@@ -36,12 +47,12 @@ impl<T: Address> NewService for TcpConnectorFactory<T> {
type Future = FutureResult<Self::Service, Self::InitError>; type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future { fn new_service(&self, _: &()) -> Self::Future {
ok(TcpConnector(PhantomData)) ok(self.service())
} }
} }
/// Tcp connector service /// Tcp connector service
#[derive(Debug)] #[derive(Default, Debug)]
pub struct TcpConnector<T>(PhantomData<T>); pub struct TcpConnector<T>(PhantomData<T>);
impl<T> TcpConnector<T> { impl<T> TcpConnector<T> {

View File

@@ -10,12 +10,11 @@
#[macro_use] #[macro_use]
extern crate log; extern crate log;
use std::cell::RefCell;
mod connect; mod connect;
mod connector; mod connector;
mod error; mod error;
mod resolver; mod resolver;
mod service;
pub mod ssl; pub mod ssl;
#[cfg(feature = "uri")] #[cfg(feature = "uri")]
@@ -29,7 +28,9 @@ pub use self::connect::{Address, Connect, Connection};
pub use self::connector::{TcpConnector, TcpConnectorFactory}; pub use self::connector::{TcpConnector, TcpConnectorFactory};
pub use self::error::ConnectError; pub use self::error::ConnectError;
pub use self::resolver::{Resolver, ResolverFactory}; pub use self::resolver::{Resolver, ResolverFactory};
pub use self::service::{ConnectService, ConnectServiceFactory, TcpConnectService};
use actix_rt::Arbiter;
use actix_service::{NewService, Service, ServiceExt}; use actix_service::{NewService, Service, ServiceExt};
use tokio_tcp::TcpStream; use tokio_tcp::TcpStream;
@@ -39,16 +40,12 @@ pub fn start_resolver(cfg: ResolverConfig, opts: ResolverOpts) -> AsyncResolver
resolver resolver
} }
thread_local! { struct DefaultResolver(AsyncResolver);
static DEFAULT_RESOLVER: RefCell<Option<AsyncResolver>> = RefCell::new(None);
}
pub(crate) fn get_default_resolver() -> AsyncResolver { pub(crate) fn get_default_resolver() -> AsyncResolver {
DEFAULT_RESOLVER.with(|cell| { if Arbiter::contains_item::<DefaultResolver>() {
if let Some(ref resolver) = *cell.borrow() { return Arbiter::get_item(|item: &DefaultResolver| item.0.clone());
return resolver.clone(); } else {
}
let (cfg, opts) = match read_system_conf() { let (cfg, opts) = match read_system_conf() {
Ok((cfg, opts)) => (cfg, opts), Ok((cfg, opts)) => (cfg, opts),
Err(e) => { Err(e) => {
@@ -60,9 +57,9 @@ pub(crate) fn get_default_resolver() -> AsyncResolver {
let (resolver, bg) = AsyncResolver::new(cfg, opts); let (resolver, bg) = AsyncResolver::new(cfg, opts);
tokio_current_thread::spawn(bg); tokio_current_thread::spawn(bg);
*cell.borrow_mut() = Some(resolver.clone()); Arbiter::set_item(DefaultResolver(resolver.clone()));
resolver resolver
}) }
} }
pub fn start_default_resolver() -> AsyncResolver { pub fn start_default_resolver() -> AsyncResolver {

View File

@@ -25,6 +25,13 @@ impl<T> ResolverFactory<T> {
_t: PhantomData, _t: PhantomData,
} }
} }
pub fn service(&self) -> Resolver<T> {
Resolver {
resolver: self.resolver.clone(),
_t: PhantomData,
}
}
} }
impl<T> Default for ResolverFactory<T> { impl<T> Default for ResolverFactory<T> {
@@ -55,10 +62,7 @@ impl<T: Address> NewService for ResolverFactory<T> {
type Future = FutureResult<Self::Service, Self::InitError>; type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future { fn new_service(&self, _: &()) -> Self::Future {
ok(Resolver { ok(self.service())
resolver: self.resolver.clone(),
_t: PhantomData,
})
} }
} }
@@ -109,17 +113,15 @@ impl<T: Address> Service for Resolver<T> {
fn call(&mut self, mut req: Connect<T>) -> Self::Future { fn call(&mut self, mut req: Connect<T>) -> Self::Future {
if req.addr.is_some() { if req.addr.is_some() {
Either::B(ok(req)) Either::B(ok(req))
} else if let Ok(ip) = req.host().parse() {
req.addr = Some(either::Either::Left(SocketAddr::new(ip, req.port())));
Either::B(ok(req))
} else { } else {
if let Ok(ip) = req.host().parse() { trace!("DNS resolver: resolving host {:?}", req.host());
req.addr = Some(either::Either::Left(SocketAddr::new(ip, req.port()))); if self.resolver.is_none() {
Either::B(ok(req)) self.resolver = Some(get_default_resolver());
} else {
trace!("DNS resolver: resolving host {:?}", req.host());
if self.resolver.is_none() {
self.resolver = Some(get_default_resolver());
}
Either::A(ResolverFuture::new(req, self.resolver.as_ref().unwrap()))
} }
Either::A(ResolverFuture::new(req, self.resolver.as_ref().unwrap()))
} }
} }
} }

View File

@@ -0,0 +1,183 @@
use actix_service::{NewService, Service};
use futures::future::{ok, FutureResult};
use futures::{try_ready, Async, Future, Poll};
use tokio_tcp::TcpStream;
use trust_dns_resolver::AsyncResolver;
use crate::connect::{Address, Connect, Connection};
use crate::connector::{TcpConnector, TcpConnectorFactory};
use crate::error::ConnectError;
use crate::resolver::{Resolver, ResolverFactory};
pub struct ConnectServiceFactory<T> {
tcp: TcpConnectorFactory<T>,
resolver: ResolverFactory<T>,
}
impl<T> ConnectServiceFactory<T> {
/// Construct new ConnectService factory
pub fn new() -> Self {
ConnectServiceFactory {
tcp: TcpConnectorFactory::default(),
resolver: ResolverFactory::default(),
}
}
/// Construct new connect service with custom dns resolver
pub fn with_resolver(resolver: AsyncResolver) -> Self {
ConnectServiceFactory {
tcp: TcpConnectorFactory::default(),
resolver: ResolverFactory::new(resolver),
}
}
/// Construct new service
pub fn service(&self) -> ConnectService<T> {
ConnectService {
tcp: self.tcp.service(),
resolver: self.resolver.service(),
}
}
/// Construct new tcp stream service
pub fn tcp_service(&self) -> TcpConnectService<T> {
TcpConnectService {
tcp: self.tcp.service(),
resolver: self.resolver.service(),
}
}
}
impl<T> Default for ConnectServiceFactory<T> {
fn default() -> Self {
ConnectServiceFactory {
tcp: TcpConnectorFactory::default(),
resolver: ResolverFactory::default(),
}
}
}
impl<T> Clone for ConnectServiceFactory<T> {
fn clone(&self) -> Self {
ConnectServiceFactory {
tcp: self.tcp.clone(),
resolver: self.resolver.clone(),
}
}
}
impl<T: Address> NewService for ConnectServiceFactory<T> {
type Request = Connect<T>;
type Response = Connection<T, TcpStream>;
type Error = ConnectError;
type Config = ();
type Service = ConnectService<T>;
type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future {
ok(self.service())
}
}
#[derive(Clone)]
pub struct ConnectService<T> {
tcp: TcpConnector<T>,
resolver: Resolver<T>,
}
impl<T: Address> Service for ConnectService<T> {
type Request = Connect<T>;
type Response = Connection<T, TcpStream>;
type Error = ConnectError;
type Future = ConnectServiceResponse<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: Connect<T>) -> Self::Future {
ConnectServiceResponse {
fut1: Some(self.resolver.call(req)),
fut2: None,
tcp: self.tcp.clone(),
}
}
}
pub struct ConnectServiceResponse<T: Address> {
fut1: Option<<Resolver<T> as Service>::Future>,
fut2: Option<<TcpConnector<T> as Service>::Future>,
tcp: TcpConnector<T>,
}
impl<T: Address> Future for ConnectServiceResponse<T> {
type Item = Connection<T, TcpStream>;
type Error = ConnectError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.fut1 {
let res = try_ready!(fut.poll());
let _ = self.fut1.take();
self.fut2 = Some(self.tcp.call(res));
}
if let Some(ref mut fut) = self.fut2 {
return fut.poll();
}
Ok(Async::NotReady)
}
}
#[derive(Clone)]
pub struct TcpConnectService<T> {
tcp: TcpConnector<T>,
resolver: Resolver<T>,
}
impl<T: Address> Service for TcpConnectService<T> {
type Request = Connect<T>;
type Response = TcpStream;
type Error = ConnectError;
type Future = TcpConnectServiceResponse<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: Connect<T>) -> Self::Future {
TcpConnectServiceResponse {
fut1: Some(self.resolver.call(req)),
fut2: None,
tcp: self.tcp.clone(),
}
}
}
pub struct TcpConnectServiceResponse<T: Address> {
fut1: Option<<Resolver<T> as Service>::Future>,
fut2: Option<<TcpConnector<T> as Service>::Future>,
tcp: TcpConnector<T>,
}
impl<T: Address> Future for TcpConnectServiceResponse<T> {
type Item = TcpStream;
type Error = ConnectError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.fut1 {
let res = try_ready!(fut.poll());
let _ = self.fut1.take();
self.fut2 = Some(self.tcp.call(res));
}
if let Some(ref mut fut) = self.fut2 {
if let Async::Ready(conn) = fut.poll()? {
return Ok(Async::Ready(conn.into_parts().0));
}
}
Ok(Async::NotReady)
}
}

View File

@@ -3,4 +3,10 @@
#[cfg(feature = "ssl")] #[cfg(feature = "ssl")]
mod openssl; mod openssl;
#[cfg(feature = "ssl")] #[cfg(feature = "ssl")]
pub use self::openssl::OpensslConnector; pub use self::openssl::{
OpensslConnectService, OpensslConnectServiceFactory, OpensslConnector,
};
#[cfg(feature = "rust-tls")]
mod rustls;
#[cfg(feature = "rust-tls")]
pub use self::rustls::RustlsConnector;

View File

@@ -1,13 +1,17 @@
use std::fmt;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::{fmt, io};
use actix_codec::{AsyncRead, AsyncWrite}; use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::{NewService, Service}; use actix_service::{NewService, Service};
use futures::{future::ok, future::FutureResult, Async, Future, Poll}; use futures::{future::ok, future::FutureResult, try_ready, Async, Future, Poll};
use openssl::ssl::{HandshakeError, SslConnector}; use openssl::ssl::{HandshakeError, SslConnector};
use tokio_openssl::{ConnectAsync, SslConnectorExt, SslStream}; use tokio_openssl::{ConnectAsync, SslConnectorExt, SslStream};
use tokio_tcp::TcpStream;
use trust_dns_resolver::AsyncResolver;
use crate::{Address, Connection}; use crate::{
Address, Connect, ConnectError, ConnectService, ConnectServiceFactory, Connection,
};
/// Openssl connector factory /// Openssl connector factory
pub struct OpensslConnector<T, U> { pub struct OpensslConnector<T, U> {
@@ -77,6 +81,15 @@ pub struct OpensslConnectorService<T, U> {
_t: PhantomData<(T, U)>, _t: PhantomData<(T, U)>,
} }
impl<T, U> Clone for OpensslConnectorService<T, U> {
fn clone(&self) -> Self {
Self {
connector: self.connector.clone(),
_t: PhantomData,
}
}
}
impl<T: Address, U> Service for OpensslConnectorService<T, U> impl<T: Address, U> Service for OpensslConnectorService<T, U>
where where
U: AsyncRead + AsyncWrite + fmt::Debug, U: AsyncRead + AsyncWrite + fmt::Debug,
@@ -126,3 +139,113 @@ where
} }
} }
} }
pub struct OpensslConnectServiceFactory<T> {
tcp: ConnectServiceFactory<T>,
openssl: OpensslConnector<T, TcpStream>,
}
impl<T> OpensslConnectServiceFactory<T> {
/// Construct new OpensslConnectService factory
pub fn new(connector: SslConnector) -> Self {
OpensslConnectServiceFactory {
tcp: ConnectServiceFactory::default(),
openssl: OpensslConnector::new(connector),
}
}
/// Construct new connect service with custom dns resolver
pub fn with_resolver(connector: SslConnector, resolver: AsyncResolver) -> Self {
OpensslConnectServiceFactory {
tcp: ConnectServiceFactory::with_resolver(resolver),
openssl: OpensslConnector::new(connector),
}
}
/// Construct openssl connect service
pub fn service(&self) -> OpensslConnectService<T> {
OpensslConnectService {
tcp: self.tcp.service(),
openssl: OpensslConnectorService {
connector: self.openssl.connector.clone(),
_t: PhantomData,
},
}
}
}
impl<T> Clone for OpensslConnectServiceFactory<T> {
fn clone(&self) -> Self {
OpensslConnectServiceFactory {
tcp: self.tcp.clone(),
openssl: self.openssl.clone(),
}
}
}
impl<T: Address> NewService for OpensslConnectServiceFactory<T> {
type Request = Connect<T>;
type Response = SslStream<TcpStream>;
type Error = ConnectError;
type Config = ();
type Service = OpensslConnectService<T>;
type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future {
ok(self.service())
}
}
#[derive(Clone)]
pub struct OpensslConnectService<T> {
tcp: ConnectService<T>,
openssl: OpensslConnectorService<T, TcpStream>,
}
impl<T: Address> Service for OpensslConnectService<T> {
type Request = Connect<T>;
type Response = SslStream<TcpStream>;
type Error = ConnectError;
type Future = OpensslConnectServiceResponse<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: Connect<T>) -> Self::Future {
OpensslConnectServiceResponse {
fut1: Some(self.tcp.call(req)),
fut2: None,
openssl: self.openssl.clone(),
}
}
}
pub struct OpensslConnectServiceResponse<T: Address> {
fut1: Option<<ConnectService<T> as Service>::Future>,
fut2: Option<<OpensslConnectorService<T, TcpStream> as Service>::Future>,
openssl: OpensslConnectorService<T, TcpStream>,
}
impl<T: Address> Future for OpensslConnectServiceResponse<T> {
type Item = SslStream<TcpStream>;
type Error = ConnectError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.fut1 {
let res = try_ready!(fut.poll());
let _ = self.fut1.take();
self.fut2 = Some(self.openssl.call(res));
}
if let Some(ref mut fut) = self.fut2 {
let connect = try_ready!(fut
.poll()
.map_err(|e| ConnectError::Io(io::Error::new(io::ErrorKind::Other, e))));
Ok(Async::Ready(connect.into_parts().0))
} else {
Ok(Async::NotReady)
}
}
}

View File

@@ -0,0 +1,133 @@
use std::fmt;
use std::marker::PhantomData;
use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::{NewService, Service};
use futures::{future::ok, future::FutureResult, Async, Future, Poll};
use std::sync::Arc;
use tokio_rustls::{
rustls::{ClientConfig, ClientSession},
Connect, TlsConnector, TlsStream,
};
use webpki::DNSNameRef;
use crate::{Address, Connection};
/// Rustls connector factory
pub struct RustlsConnector<T, U> {
connector: Arc<ClientConfig>,
_t: PhantomData<(T, U)>,
}
impl<T, U> RustlsConnector<T, U> {
pub fn new(connector: Arc<ClientConfig>) -> Self {
RustlsConnector {
connector,
_t: PhantomData,
}
}
}
impl<T, U> RustlsConnector<T, U>
where
T: Address,
U: AsyncRead + AsyncWrite + fmt::Debug,
{
pub fn service(
connector: Arc<ClientConfig>,
) -> impl Service<
Request = Connection<T, U>,
Response = Connection<T, TlsStream<U, ClientSession>>,
Error = std::io::Error,
> {
RustlsConnectorService {
connector: connector,
_t: PhantomData,
}
}
}
impl<T, U> Clone for RustlsConnector<T, U> {
fn clone(&self) -> Self {
Self {
connector: self.connector.clone(),
_t: PhantomData,
}
}
}
impl<T: Address, U> NewService for RustlsConnector<T, U>
where
U: AsyncRead + AsyncWrite + fmt::Debug,
{
type Request = Connection<T, U>;
type Response = Connection<T, TlsStream<U, ClientSession>>;
type Error = std::io::Error;
type Config = ();
type Service = RustlsConnectorService<T, U>;
type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future {
ok(RustlsConnectorService {
connector: self.connector.clone(),
_t: PhantomData,
})
}
}
pub struct RustlsConnectorService<T, U> {
connector: Arc<ClientConfig>,
_t: PhantomData<(T, U)>,
}
impl<T: Address, U> Service for RustlsConnectorService<T, U>
where
U: AsyncRead + AsyncWrite + fmt::Debug,
{
type Request = Connection<T, U>;
type Response = Connection<T, TlsStream<U, ClientSession>>;
type Error = std::io::Error;
type Future = ConnectAsyncExt<T, U>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, stream: Connection<T, U>) -> Self::Future {
trace!("SSL Handshake start for: {:?}", stream.host());
let (io, stream) = stream.replace(());
let host = DNSNameRef::try_from_ascii_str(stream.host()).unwrap();
ConnectAsyncExt {
fut: TlsConnector::from(self.connector.clone()).connect(host, io),
stream: Some(stream),
}
}
}
pub struct ConnectAsyncExt<T, U> {
fut: Connect<U>,
stream: Option<Connection<T, ()>>,
}
impl<T: Address, U> Future for ConnectAsyncExt<T, U>
where
U: AsyncRead + AsyncWrite + fmt::Debug,
{
type Item = Connection<T, TlsStream<U, ClientSession>>;
type Error = std::io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.fut.poll().map_err(|e| {
trace!("SSL Handshake error: {:?}", e);
e
})? {
Async::Ready(stream) => {
let s = self.stream.take().unwrap();
trace!("SSL Handshake success: {:?}", s.host());
Ok(Async::Ready(s.replace(stream).1))
}
Async::NotReady => Ok(Async::NotReady),
}
}
}

View File

@@ -26,6 +26,22 @@ fn test_string() {
assert_eq!(con.peer_addr().unwrap(), srv.addr()); assert_eq!(con.peer_addr().unwrap(), srv.addr());
} }
#[cfg(feature = "rust-tls")]
#[test]
fn test_rustls_string() {
let mut srv = TestServer::with(|| {
service_fn(|io: Io<tokio_tcp::TcpStream>| {
Framed::new(io.into_parts().0, BytesCodec)
.send(Bytes::from_static(b"test"))
.then(|_| Ok::<_, ()>(()))
})
});
let mut conn = default_connector();
let addr = format!("localhost:{}", srv.port());
let con = srv.run_on(move || conn.call(addr.into())).unwrap();
assert_eq!(con.peer_addr().unwrap(), srv.addr());
}
#[test] #[test]
fn test_static_str() { fn test_static_str() {
let mut srv = TestServer::with(|| { let mut srv = TestServer::with(|| {
@@ -107,3 +123,20 @@ fn test_uri() {
let con = srv.run_on(move || conn.call(addr.into())).unwrap(); let con = srv.run_on(move || conn.call(addr.into())).unwrap();
assert_eq!(con.peer_addr().unwrap(), srv.addr()); assert_eq!(con.peer_addr().unwrap(), srv.addr());
} }
#[cfg(feature = "rust-tls")]
#[test]
fn test_rustls_uri() {
let mut srv = TestServer::with(|| {
service_fn(|io: Io<tokio_tcp::TcpStream>| {
Framed::new(io.into_parts().0, BytesCodec)
.send(Bytes::from_static(b"test"))
.then(|_| Ok::<_, ()>(()))
})
});
let mut conn = default_connector();
let addr = Uri::try_from(format!("https://localhost:{}", srv.port())).unwrap();
let con = srv.run_on(move || conn.call(addr.into())).unwrap();
assert_eq!(con.peer_addr().unwrap(), srv.addr());
}

View File

@@ -1,5 +1,5 @@
# Changes # Changes
## [0.1.0] - 2019-xx-xx ## [0.1.0] - 2019-07-17
* Initial release * Initial release

View File

@@ -29,7 +29,7 @@ impl<T> Cell<T> {
} }
} }
pub fn get_mut(&mut self) -> &mut T { pub(crate) unsafe fn get_mut(&mut self) -> &mut T {
unsafe { &mut *self.inner.as_ref().get() } &mut *self.inner.as_ref().get()
} }
} }

View File

@@ -44,7 +44,7 @@ where
framed: Framed<T, U>, framed: Framed<T, U>,
rx: Option<mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>>, rx: Option<mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>>,
inner: Cell<FramedDispatcherInner<<U as Encoder>::Item, S::Error>>, inner: Cell<FramedDispatcherInner<<U as Encoder>::Item, S::Error>>,
disconnect: Option<Rc<Fn(&mut St, bool)>>, disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
} }
impl<St, S, T, U> FramedDispatcher<St, S, T, U> impl<St, S, T, U> FramedDispatcher<St, S, T, U>
@@ -63,7 +63,7 @@ where
service: F, service: F,
rx: mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>, rx: mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>,
sink: Sink<<U as Encoder>::Item>, sink: Sink<<U as Encoder>::Item>,
disconnect: Option<Rc<Fn(&mut St, bool)>>, disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
) -> Self { ) -> Self {
FramedDispatcher { FramedDispatcher {
framed, framed,
@@ -153,7 +153,7 @@ where
}; };
let mut cell = self.inner.clone(); let mut cell = self.inner.clone();
cell.get_mut().task.register(); unsafe { cell.get_mut().task.register() };
tokio_current_thread::spawn( tokio_current_thread::spawn(
self.service self.service
.call(Item::new(self.state.clone(), self.sink.clone(), item)) .call(Item::new(self.state.clone(), self.sink.clone(), item))
@@ -163,9 +163,11 @@ where
Ok(None) => return Ok(()), Ok(None) => return Ok(()),
Err(err) => Err(err), Err(err) => Err(err),
}; };
let inner = cell.get_mut(); unsafe {
inner.buf.push_back(item); let inner = cell.get_mut();
inner.task.notify(); inner.buf.push_back(item);
inner.task.notify();
}
Ok(()) Ok(())
}), }),
); );
@@ -181,7 +183,7 @@ where
/// write to framed object /// write to framed object
fn poll_write(&mut self) -> bool { fn poll_write(&mut self) -> bool {
let inner = self.inner.get_mut(); let inner = unsafe { self.inner.get_mut() };
let mut rx_done = self.rx.is_none(); let mut rx_done = self.rx.is_none();
let mut buf_empty = inner.buf.is_empty(); let mut buf_empty = inner.buf.is_empty();
loop { loop {

View File

@@ -62,7 +62,7 @@ impl<St, Codec> Builder<St, Codec> {
pub struct ServiceBuilder<St, C, Io, Codec> { pub struct ServiceBuilder<St, C, Io, Codec> {
connect: C, connect: C,
disconnect: Option<Rc<Fn(&mut St, bool)>>, disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
_t: PhantomData<(St, Io, Codec)>, _t: PhantomData<(St, Io, Codec)>,
} }
@@ -113,7 +113,7 @@ where
pub struct NewServiceBuilder<St, C, Io, Codec> { pub struct NewServiceBuilder<St, C, Io, Codec> {
connect: C, connect: C,
disconnect: Option<Rc<Fn(&mut St, bool)>>, disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
_t: PhantomData<(St, Io, Codec)>, _t: PhantomData<(St, Io, Codec)>,
} }
@@ -170,7 +170,7 @@ where
pub(crate) struct FramedService<St, C, T, Io, Codec, Cfg> { pub(crate) struct FramedService<St, C, T, Io, Codec, Cfg> {
connect: C, connect: C,
handler: Rc<T>, handler: Rc<T>,
disconnect: Option<Rc<Fn(&mut St, bool)>>, disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
_t: PhantomData<(St, Io, Codec, Cfg)>, _t: PhantomData<(St, Io, Codec, Cfg)>,
} }
@@ -198,7 +198,7 @@ where
type Error = ServiceError<C::Error, Codec>; type Error = ServiceError<C::Error, Codec>;
type InitError = C::InitError; type InitError = C::InitError;
type Service = FramedServiceImpl<St, C::Service, T, Io, Codec>; type Service = FramedServiceImpl<St, C::Service, T, Io, Codec>;
type Future = Box<Future<Item = Self::Service, Error = Self::InitError>>; type Future = Box<dyn Future<Item = Self::Service, Error = Self::InitError>>;
fn new_service(&self, _: &Cfg) -> Self::Future { fn new_service(&self, _: &Cfg) -> Self::Future {
let handler = self.handler.clone(); let handler = self.handler.clone();
@@ -221,7 +221,7 @@ where
pub struct FramedServiceImpl<St, C, T, Io, Codec> { pub struct FramedServiceImpl<St, C, T, Io, Codec> {
connect: C, connect: C,
handler: Rc<T>, handler: Rc<T>,
disconnect: Option<Rc<Fn(&mut St, bool)>>, disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
_t: PhantomData<(St, Io, Codec)>, _t: PhantomData<(St, Io, Codec)>,
} }
@@ -280,7 +280,7 @@ where
<Codec as Encoder>::Error: std::fmt::Debug, <Codec as Encoder>::Error: std::fmt::Debug,
{ {
inner: FramedServiceImplResponseInner<St, Io, Codec, C, T>, inner: FramedServiceImplResponseInner<St, Io, Codec, C, T>,
disconnect: Option<Rc<Fn(&mut St, bool)>>, disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
} }
enum FramedServiceImplResponseInner<St, Io, Codec, C, T> enum FramedServiceImplResponseInner<St, Io, Codec, C, T>

View File

@@ -1,5 +1,12 @@
# Changes # Changes
## [0.2.5] - 2019-09-02
### Added
* Add arbiter specific storage
## [0.2.4] - 2019-07-17 ## [0.2.4] - 2019-07-17
### Changed ### Changed

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "actix-rt" name = "actix-rt"
version = "0.2.4" version = "0.2.5"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix runtime" description = "Actix runtime"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View File

@@ -1,3 +1,4 @@
use std::any::{Any, TypeId};
use std::cell::{Cell, RefCell}; use std::cell::{Cell, RefCell};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
@@ -16,15 +17,16 @@ use copyless::BoxHelper;
thread_local!( thread_local!(
static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None); static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None);
static RUNNING: Cell<bool> = Cell::new(false); static RUNNING: Cell<bool> = Cell::new(false);
static Q: RefCell<Vec<Box<Future<Item = (), Error = ()>>>> = RefCell::new(Vec::new()); static Q: RefCell<Vec<Box<dyn Future<Item = (), Error = ()>>>> = RefCell::new(Vec::new());
static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
); );
pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0); pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);
pub(crate) enum ArbiterCommand { pub(crate) enum ArbiterCommand {
Stop, Stop,
Execute(Box<Future<Item = (), Error = ()> + Send>), Execute(Box<dyn Future<Item = (), Error = ()> + Send>),
ExecuteFn(Box<FnExec>), ExecuteFn(Box<dyn FnExec>),
} }
impl fmt::Debug for ArbiterCommand { impl fmt::Debug for ArbiterCommand {
@@ -56,6 +58,7 @@ impl Arbiter {
let arb = Arbiter(tx); let arb = Arbiter(tx);
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
RUNNING.with(|cell| cell.set(false)); RUNNING.with(|cell| cell.set(false));
STORAGE.with(|cell| cell.borrow_mut().clear());
Arbiter::spawn(ArbiterController { stop: None, rx }); Arbiter::spawn(ArbiterController { stop: None, rx });
arb arb
@@ -90,6 +93,7 @@ impl Arbiter {
let (stop, stop_rx) = channel(); let (stop, stop_rx) = channel();
RUNNING.with(|cell| cell.set(true)); RUNNING.with(|cell| cell.set(true));
STORAGE.with(|cell| cell.borrow_mut().clear());
System::set_current(sys); System::set_current(sys);
@@ -180,7 +184,7 @@ impl Arbiter {
let _ = self let _ = self
.0 .0
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || { .unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
let _ = f(); f();
}))); })));
} }
@@ -202,6 +206,50 @@ impl Arbiter {
}))); })));
rx rx
} }
/// Set item to arbiter storage
pub fn set_item<T: 'static>(item: T) {
STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::<T>(), Box::new(item)));
}
/// Check if arbiter storage contains item
pub fn contains_item<T: 'static>() -> bool {
STORAGE.with(move |cell| cell.borrow().get(&TypeId::of::<T>()).is_some())
}
/// Get a reference to a type previously inserted on this arbiter's storage.
///
/// Panics is item is not inserted
pub fn get_item<T: 'static, F, R>(mut f: F) -> R
where
F: FnMut(&T) -> R,
{
STORAGE.with(move |cell| {
let st = cell.borrow();
let item = st
.get(&TypeId::of::<T>())
.and_then(|boxed| (&**boxed as &(dyn Any + 'static)).downcast_ref())
.unwrap();
f(item)
})
}
/// Get a mutable reference to a type previously inserted on this arbiter's storage.
///
/// Panics is item is not inserted
pub fn get_mut_item<T: 'static, F, R>(mut f: F) -> R
where
F: FnMut(&mut T) -> R,
{
STORAGE.with(move |cell| {
let mut st = cell.borrow_mut();
let item = st
.get_mut(&TypeId::of::<T>())
.and_then(|boxed| (&mut **boxed as &mut (dyn Any + 'static)).downcast_mut())
.unwrap();
f(item)
})
}
} }
struct ArbiterController { struct ArbiterController {
@@ -312,7 +360,7 @@ impl<F> FnExec for F
where where
F: FnOnce() + Send + 'static, F: FnOnce() + Send + 'static,
{ {
#[cfg_attr(feature = "cargo-clippy", allow(boxed_local))] #[allow(clippy::boxed_local)]
fn call_box(self: Box<Self>) { fn call_box(self: Box<Self>) {
(*self)() (*self)()
} }

View File

@@ -40,7 +40,7 @@ impl Error for RunError {
fn description(&self) -> &str { fn description(&self) -> &str {
self.inner.description() self.inner.description()
} }
fn cause(&self) -> Option<&Error> { fn cause(&self) -> Option<&dyn Error> {
self.inner.source() self.inner.source()
} }
} }

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "actix-server-config" name = "actix-server-config"
version = "0.1.1" version = "0.1.2"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix server config utils" description = "Actix server config utils"
homepage = "https://actix.rs" homepage = "https://actix.rs"
@@ -14,7 +14,7 @@ name = "actix_server_config"
path = "src/lib.rs" path = "src/lib.rs"
[package.metadata.docs.rs] [package.metadata.docs.rs]
features = ["ssl", "rust-tls"] features = ["ssl", "rust-tls", "uds"]
[features] [features]
default = [] default = []
@@ -25,6 +25,9 @@ ssl = ["tokio-openssl"]
# rustls # rustls
rust-tls = ["rustls", "tokio-rustls"] rust-tls = ["rustls", "tokio-rustls"]
# unix domain sockets
uds = ["tokio-uds"]
[dependencies] [dependencies]
futures = "0.1.25" futures = "0.1.25"
tokio-io = "0.1.12" tokio-io = "0.1.12"
@@ -32,3 +35,4 @@ tokio-tcp = "0.1"
tokio-openssl = { version="0.3.0", optional = true } tokio-openssl = { version="0.3.0", optional = true }
rustls = { version = "0.15.2", optional = true } rustls = { version = "0.15.2", optional = true }
tokio-rustls = { version = "0.9.1", optional = true } tokio-rustls = { version = "0.9.1", optional = true }
tokio-uds = { version="0.2.5", optional = true }

View File

@@ -1,5 +1,12 @@
# Changes # Changes
## [0.1.2] - 2019-07-18
### Added
* Add unix domnain sockets support
## [0.1.1] - 2019-04-16 ## [0.1.1] - 2019-04-16
### Added ### Added

View File

@@ -95,9 +95,9 @@ impl<T, P> Io<T, P> {
/// Return new Io object with new parameter. /// Return new Io object with new parameter.
pub fn set<U>(self, params: U) -> Io<T, U> { pub fn set<U>(self, params: U) -> Io<T, U> {
Io { Io {
params,
io: self.io, io: self.io,
proto: self.proto, proto: self.proto,
params: params,
} }
} }
@@ -216,3 +216,26 @@ impl<T: IoStream> IoStream for tokio_rustls::TlsStream<T, rustls::ServerSession>
self.get_mut().0.set_keepalive(dur) self.get_mut().0.set_keepalive(dur)
} }
} }
#[cfg(all(unix, feature = "uds"))]
impl IoStream for tokio_uds::UnixStream {
#[inline]
fn peer_addr(&self) -> Option<net::SocketAddr> {
None
}
#[inline]
fn set_nodelay(&mut self, _: bool) -> io::Result<()> {
Ok(())
}
#[inline]
fn set_linger(&mut self, _: Option<time::Duration>) -> io::Result<()> {
Ok(())
}
#[inline]
fn set_keepalive(&mut self, _: Option<time::Duration>) -> io::Result<()> {
Ok(())
}
}

View File

@@ -1,5 +1,12 @@
# Changes # Changes
## [0.6.0] - 2019-07-18
### Added
* Support Unix domain sockets #3
## [0.5.1] - 2019-05-18 ## [0.5.1] - 2019-05-18
### Changed ### Changed

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "actix-server" name = "actix-server"
version = "0.5.1" version = "0.6.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix server - General purpose tcp server" description = "Actix server - General purpose tcp server"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@@ -14,7 +14,7 @@ edition = "2018"
workspace = ".." workspace = ".."
[package.metadata.docs.rs] [package.metadata.docs.rs]
features = ["ssl", "tls", "rust-tls"] features = ["ssl", "tls", "rust-tls", "uds"]
[lib] [lib]
name = "actix_server" name = "actix_server"
@@ -32,15 +32,18 @@ ssl = ["openssl", "tokio-openssl", "actix-server-config/ssl"]
# rustls # rustls
rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots", "actix-server-config/rust-tls"] rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots", "actix-server-config/rust-tls"]
# uds
uds = ["mio-uds", "tokio-uds", "actix-server-config/uds"]
[dependencies] [dependencies]
actix-rt = "0.2.1" actix-rt = "0.2.2"
actix-service = "0.4.0" actix-service = "0.4.1"
actix-server-config = "0.1.1" actix-server-config = "0.1.2"
log = "0.4" log = "0.4"
num_cpus = "1.0" num_cpus = "1.0"
mio = "0.6.13" mio = "0.6.19"
net2 = "0.2" net2 = "0.2"
futures = "0.1" futures = "0.1"
slab = "0.4" slab = "0.4"
@@ -50,6 +53,10 @@ tokio-timer = "0.2.8"
tokio-reactor = "0.1" tokio-reactor = "0.1"
tokio-signal = "0.2" tokio-signal = "0.2"
# unix domain sockets
mio-uds = { version="0.6.7", optional = true }
tokio-uds = { version="0.2.5", optional = true }
# native-tls # native-tls
native-tls = { version="0.2", optional = true } native-tls = { version="0.2", optional = true }
@@ -57,7 +64,7 @@ native-tls = { version="0.2", optional = true }
openssl = { version="0.10", optional = true } openssl = { version="0.10", optional = true }
tokio-openssl = { version="0.3", optional = true } tokio-openssl = { version="0.3", optional = true }
#rustls # rustls
rustls = { version = "0.15.2", optional = true } rustls = { version = "0.15.2", optional = true }
tokio-rustls = { version = "0.9.1", optional = true } tokio-rustls = { version = "0.9.1", optional = true }
webpki = { version = "0.19", optional = true } webpki = { version = "0.19", optional = true }

View File

@@ -1,17 +1,17 @@
use std::sync::mpsc as sync_mpsc; use std::sync::mpsc as sync_mpsc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::{io, net, thread}; use std::{io, thread};
use actix_rt::System; use actix_rt::System;
use futures::future::{lazy, Future}; use futures::future::{lazy, Future};
use log::{error, info}; use log::{error, info};
use mio;
use slab::Slab; use slab::Slab;
use tokio_timer::Delay; use tokio_timer::Delay;
use super::server::Server; use crate::server::Server;
use super::worker::{Conn, WorkerClient}; use crate::socket::{SocketAddr, SocketListener, StdListener};
use super::Token; use crate::worker::{Conn, WorkerClient};
use crate::Token;
pub(crate) enum Command { pub(crate) enum Command {
Pause, Pause,
@@ -21,9 +21,9 @@ pub(crate) enum Command {
} }
struct ServerSocketInfo { struct ServerSocketInfo {
addr: net::SocketAddr, addr: SocketAddr,
token: Token, token: Token,
sock: mio::net::TcpListener, sock: SocketListener,
timeout: Option<Instant>, timeout: Option<Instant>,
} }
@@ -84,7 +84,7 @@ impl AcceptLoop {
pub(crate) fn start( pub(crate) fn start(
&mut self, &mut self,
socks: Vec<(Token, net::TcpListener)>, socks: Vec<(Token, StdListener)>,
workers: Vec<WorkerClient>, workers: Vec<WorkerClient>,
) { ) {
let srv = self.srv.take().expect("Can not re-use AcceptInfo"); let srv = self.srv.take().expect("Can not re-use AcceptInfo");
@@ -135,7 +135,7 @@ impl Accept {
rx: sync_mpsc::Receiver<Command>, rx: sync_mpsc::Receiver<Command>,
cmd_reg: mio::Registration, cmd_reg: mio::Registration,
notify_reg: mio::Registration, notify_reg: mio::Registration,
socks: Vec<(Token, net::TcpListener)>, socks: Vec<(Token, StdListener)>,
srv: Server, srv: Server,
workers: Vec<WorkerClient>, workers: Vec<WorkerClient>,
) { ) {
@@ -174,7 +174,7 @@ impl Accept {
fn new( fn new(
rx: sync_mpsc::Receiver<Command>, rx: sync_mpsc::Receiver<Command>,
socks: Vec<(Token, net::TcpListener)>, socks: Vec<(Token, StdListener)>,
workers: Vec<WorkerClient>, workers: Vec<WorkerClient>,
srv: Server, srv: Server,
) -> Accept { ) -> Accept {
@@ -187,10 +187,9 @@ impl Accept {
// Start accept // Start accept
let mut sockets = Slab::new(); let mut sockets = Slab::new();
for (hnd_token, lst) in socks.into_iter() { for (hnd_token, lst) in socks.into_iter() {
let addr = lst.local_addr().unwrap(); let addr = lst.local_addr();
let server = mio::net::TcpListener::from_std(lst)
.expect("Can not create mio::net::TcpListener");
let server = lst.into_listener();
let entry = sockets.vacant_entry(); let entry = sockets.vacant_entry();
let token = entry.key(); let token = entry.key();
@@ -422,12 +421,13 @@ impl Accept {
fn accept(&mut self, token: usize) { fn accept(&mut self, token: usize) {
loop { loop {
let msg = if let Some(info) = self.sockets.get_mut(token) { let msg = if let Some(info) = self.sockets.get_mut(token) {
match info.sock.accept_std() { match info.sock.accept() {
Ok((io, addr)) => Conn { Ok(Some((io, addr))) => Conn {
io, io,
token: info.token, token: info.token,
peer: Some(addr), peer: Some(addr),
}, },
Ok(None) => return,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return, Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
Err(ref e) if connection_error(e) => continue, Err(ref e) if connection_error(e) => continue,
Err(e) => { Err(e) => {

View File

@@ -9,6 +9,7 @@ use futures::{Async, Future, Poll, Stream};
use log::{error, info}; use log::{error, info};
use net2::TcpBuilder; use net2::TcpBuilder;
use num_cpus; use num_cpus;
use tokio_tcp::TcpStream;
use tokio_timer::sleep; use tokio_timer::sleep;
use crate::accept::{AcceptLoop, AcceptNotify, Command}; use crate::accept::{AcceptLoop, AcceptNotify, Command};
@@ -16,6 +17,7 @@ use crate::config::{ConfiguredService, ServiceConfig};
use crate::server::{Server, ServerCommand}; use crate::server::{Server, ServerCommand};
use crate::services::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::services::{InternalServiceFactory, ServiceFactory, StreamNewService};
use crate::signals::{Signal, Signals}; use crate::signals::{Signal, Signals};
use crate::socket::StdListener;
use crate::worker::{self, Worker, WorkerAvailability, WorkerClient}; use crate::worker::{self, Worker, WorkerAvailability, WorkerClient};
use crate::{ssl, Token}; use crate::{ssl, Token};
@@ -25,8 +27,8 @@ pub struct ServerBuilder {
token: Token, token: Token,
backlog: i32, backlog: i32,
workers: Vec<(usize, WorkerClient)>, workers: Vec<(usize, WorkerClient)>,
services: Vec<Box<InternalServiceFactory>>, services: Vec<Box<dyn InternalServiceFactory>>,
sockets: Vec<(Token, net::TcpListener)>, sockets: Vec<(Token, StdListener)>,
accept: AcceptLoop, accept: AcceptLoop,
exit: bool, exit: bool,
shutdown_timeout: Duration, shutdown_timeout: Duration,
@@ -151,7 +153,7 @@ impl ServerBuilder {
for (name, lst) in cfg.services { for (name, lst) in cfg.services {
let token = self.token.next(); let token = self.token.next();
srv.stream(token, name, lst.local_addr()?); srv.stream(token, name, lst.local_addr()?);
self.sockets.push((token, lst)); self.sockets.push((token, StdListener::Tcp(lst)));
} }
self.services.push(Box::new(srv)); self.services.push(Box::new(srv));
} }
@@ -163,7 +165,7 @@ impl ServerBuilder {
/// Add new service to the server. /// Add new service to the server.
pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self> pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
where where
F: ServiceFactory, F: ServiceFactory<TcpStream>,
U: net::ToSocketAddrs, U: net::ToSocketAddrs,
{ {
let sockets = bind_addr(addr, self.backlog)?; let sockets = bind_addr(addr, self.backlog)?;
@@ -176,11 +178,39 @@ impl ServerBuilder {
factory.clone(), factory.clone(),
lst.local_addr()?, lst.local_addr()?,
)); ));
self.sockets.push((token, lst)); self.sockets.push((token, StdListener::Tcp(lst)));
} }
Ok(self) Ok(self)
} }
#[cfg(all(unix, feature = "uds"))]
/// Add new unix domain service to the server.
pub fn bind_uds<F, U, N>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
where
F: ServiceFactory<tokio_uds::UnixStream>,
N: AsRef<str>,
U: AsRef<std::path::Path>,
{
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::os::unix::net::UnixListener;
// TODO: need to do something with existing paths
let _ = std::fs::remove_file(addr.as_ref());
let lst = UnixListener::bind(addr)?;
let token = self.token.next();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
self.services.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory.clone(),
addr,
));
self.sockets.push((token, StdListener::Uds(lst)));
Ok(self)
}
/// Add new service to the server. /// Add new service to the server.
pub fn listen<F, N: AsRef<str>>( pub fn listen<F, N: AsRef<str>>(
mut self, mut self,
@@ -189,7 +219,7 @@ impl ServerBuilder {
factory: F, factory: F,
) -> io::Result<Self> ) -> io::Result<Self>
where where
F: ServiceFactory, F: ServiceFactory<TcpStream>,
{ {
let token = self.token.next(); let token = self.token.next();
self.services.push(StreamNewService::create( self.services.push(StreamNewService::create(
@@ -198,7 +228,7 @@ impl ServerBuilder {
factory, factory,
lst.local_addr()?, lst.local_addr()?,
)); ));
self.sockets.push((token, lst)); self.sockets.push((token, StdListener::Tcp(lst)));
Ok(self) Ok(self)
} }
@@ -243,7 +273,7 @@ impl ServerBuilder {
// start accept thread // start accept thread
for sock in &self.sockets { for sock in &self.sockets {
info!("Starting server on {}", sock.1.local_addr().ok().unwrap()); info!("Starting server on {}", sock.1);
} }
self.accept self.accept
.start(mem::replace(&mut self.sockets, Vec::new()), workers); .start(mem::replace(&mut self.sockets, Vec::new()), workers);
@@ -266,7 +296,7 @@ impl ServerBuilder {
let timeout = self.shutdown_timeout; let timeout = self.shutdown_timeout;
let avail = WorkerAvailability::new(notify); let avail = WorkerAvailability::new(notify);
let worker = WorkerClient::new(idx, tx1, tx2, avail.clone()); let worker = WorkerClient::new(idx, tx1, tx2, avail.clone());
let services: Vec<Box<InternalServiceFactory>> = let services: Vec<Box<dyn InternalServiceFactory>> =
self.services.iter().map(|v| v.clone_factory()).collect(); self.services.iter().map(|v| v.clone_factory()).collect();
Arbiter::new().send(lazy(move || { Arbiter::new().send(lazy(move || {

View File

@@ -17,7 +17,7 @@ use super::Token;
pub struct ServiceConfig { pub struct ServiceConfig {
pub(crate) services: Vec<(String, net::TcpListener)>, pub(crate) services: Vec<(String, net::TcpListener)>,
pub(crate) apply: Option<Box<ServiceRuntimeConfiguration>>, pub(crate) apply: Option<Box<dyn ServiceRuntimeConfiguration>>,
pub(crate) threads: usize, pub(crate) threads: usize,
pub(crate) backlog: i32, pub(crate) backlog: i32,
} }
@@ -75,13 +75,13 @@ impl ServiceConfig {
} }
pub(super) struct ConfiguredService { pub(super) struct ConfiguredService {
rt: Box<ServiceRuntimeConfiguration>, rt: Box<dyn ServiceRuntimeConfiguration>,
names: HashMap<Token, (String, net::SocketAddr)>, names: HashMap<Token, (String, net::SocketAddr)>,
services: HashMap<String, Token>, services: HashMap<String, Token>,
} }
impl ConfiguredService { impl ConfiguredService {
pub(super) fn new(rt: Box<ServiceRuntimeConfiguration>) -> Self { pub(super) fn new(rt: Box<dyn ServiceRuntimeConfiguration>) -> Self {
ConfiguredService { ConfiguredService {
rt, rt,
names: HashMap::new(), names: HashMap::new(),
@@ -100,7 +100,7 @@ impl InternalServiceFactory for ConfiguredService {
&self.names[&token].0 &self.names[&token].0
} }
fn clone_factory(&self) -> Box<InternalServiceFactory> { fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
Box::new(Self { Box::new(Self {
rt: self.rt.clone(), rt: self.rt.clone(),
names: self.names.clone(), names: self.names.clone(),
@@ -108,7 +108,7 @@ impl InternalServiceFactory for ConfiguredService {
}) })
} }
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> { fn create(&self) -> Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
// configure services // configure services
let mut rt = ServiceRuntime::new(self.services.clone()); let mut rt = ServiceRuntime::new(self.services.clone());
self.rt.configure(&mut rt); self.rt.configure(&mut rt);
@@ -156,7 +156,7 @@ impl InternalServiceFactory for ConfiguredService {
} }
pub(super) trait ServiceRuntimeConfiguration: Send { pub(super) trait ServiceRuntimeConfiguration: Send {
fn clone(&self) -> Box<ServiceRuntimeConfiguration>; fn clone(&self) -> Box<dyn ServiceRuntimeConfiguration>;
fn configure(&self, rt: &mut ServiceRuntime); fn configure(&self, rt: &mut ServiceRuntime);
} }
@@ -165,7 +165,7 @@ impl<F> ServiceRuntimeConfiguration for F
where where
F: Fn(&mut ServiceRuntime) + Send + Clone + 'static, F: Fn(&mut ServiceRuntime) + Send + Clone + 'static,
{ {
fn clone(&self) -> Box<ServiceRuntimeConfiguration> { fn clone(&self) -> Box<dyn ServiceRuntimeConfiguration> {
Box::new(self.clone()) Box::new(self.clone())
} }
@@ -181,7 +181,7 @@ fn not_configured(_: &mut ServiceRuntime) {
pub struct ServiceRuntime { pub struct ServiceRuntime {
names: HashMap<String, Token>, names: HashMap<String, Token>,
services: HashMap<Token, BoxedNewService>, services: HashMap<Token, BoxedNewService>,
onstart: Vec<Box<Future<Item = (), Error = ()>>>, onstart: Vec<Box<dyn Future<Item = (), Error = ()>>>,
} }
impl ServiceRuntime { impl ServiceRuntime {
@@ -236,14 +236,14 @@ impl ServiceRuntime {
} }
type BoxedNewService = Box< type BoxedNewService = Box<
NewService< dyn NewService<
Request = (Option<CounterGuard>, ServerMessage), Request = (Option<CounterGuard>, ServerMessage),
Response = (), Response = (),
Error = (), Error = (),
InitError = (), InitError = (),
Config = ServerConfig, Config = ServerConfig,
Service = BoxedServerService, Service = BoxedServerService,
Future = Box<Future<Item = BoxedServerService, Error = ()>>, Future = Box<dyn Future<Item = BoxedServerService, Error = ()>>,
>, >,
>; >;
@@ -265,7 +265,7 @@ where
type InitError = (); type InitError = ();
type Config = ServerConfig; type Config = ServerConfig;
type Service = BoxedServerService; type Service = BoxedServerService;
type Future = Box<Future<Item = BoxedServerService, Error = ()>>; type Future = Box<dyn Future<Item = BoxedServerService, Error = ()>>;
fn new_service(&self, cfg: &ServerConfig) -> Self::Future { fn new_service(&self, cfg: &ServerConfig) -> Self::Future {
Box::new(self.inner.new_service(cfg).map_err(|_| ()).map(|s| { Box::new(self.inner.new_service(cfg).map_err(|_| ()).map(|s| {

View File

@@ -7,6 +7,7 @@ mod counter;
mod server; mod server;
mod services; mod services;
mod signals; mod signals;
mod socket;
pub mod ssl; pub mod ssl;
mod worker; mod worker;
@@ -17,6 +18,9 @@ pub use self::config::{ServiceConfig, ServiceRuntime};
pub use self::server::Server; pub use self::server::Server;
pub use self::services::ServiceFactory; pub use self::services::ServiceFactory;
#[doc(hidden)]
pub use self::socket::FromStream;
#[doc(hidden)] #[doc(hidden)]
pub use self::services::ServiceFactory as StreamServiceFactory; pub use self::services::ServiceFactory as StreamServiceFactory;

View File

@@ -1,4 +1,5 @@
use std::net::{self, SocketAddr}; use std::marker::PhantomData;
use std::net::SocketAddr;
use std::time::Duration; use std::time::Duration;
use actix_rt::spawn; use actix_rt::spawn;
@@ -7,24 +8,23 @@ use actix_service::{NewService, Service};
use futures::future::{err, ok, FutureResult}; use futures::future::{err, ok, FutureResult};
use futures::{Future, Poll}; use futures::{Future, Poll};
use log::error; use log::error;
use tokio_reactor::Handle;
use tokio_tcp::TcpStream;
use super::Token; use super::Token;
use crate::counter::CounterGuard; use crate::counter::CounterGuard;
use crate::socket::{FromStream, StdStream};
/// Server message /// Server message
pub(crate) enum ServerMessage { pub(crate) enum ServerMessage {
/// New stream /// New stream
Connect(net::TcpStream), Connect(StdStream),
/// Gracefull shutdown /// Gracefull shutdown
Shutdown(Duration), Shutdown(Duration),
/// Force shutdown /// Force shutdown
ForceShutdown, ForceShutdown,
} }
pub trait ServiceFactory: Send + Clone + 'static { pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
type NewService: NewService<Config = ServerConfig, Request = Io<TcpStream>>; type NewService: NewService<Config = ServerConfig, Request = Io<Stream>>;
fn create(&self) -> Self::NewService; fn create(&self) -> Self::NewService;
} }
@@ -32,13 +32,13 @@ pub trait ServiceFactory: Send + Clone + 'static {
pub(crate) trait InternalServiceFactory: Send { pub(crate) trait InternalServiceFactory: Send {
fn name(&self, token: Token) -> &str; fn name(&self, token: Token) -> &str;
fn clone_factory(&self) -> Box<InternalServiceFactory>; fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>>; fn create(&self) -> Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>>;
} }
pub(crate) type BoxedServerService = Box< pub(crate) type BoxedServerService = Box<
Service< dyn Service<
Request = (Option<CounterGuard>, ServerMessage), Request = (Option<CounterGuard>, ServerMessage),
Response = (), Response = (),
Error = (), Error = (),
@@ -56,11 +56,12 @@ impl<T> StreamService<T> {
} }
} }
impl<T> Service for StreamService<T> impl<T, I> Service for StreamService<T>
where where
T: Service<Request = Io<TcpStream>>, T: Service<Request = Io<I>>,
T::Future: 'static, T::Future: 'static,
T::Error: 'static, T::Error: 'static,
I: FromStream,
{ {
type Request = (Option<CounterGuard>, ServerMessage); type Request = (Option<CounterGuard>, ServerMessage);
type Response = (); type Response = ();
@@ -74,7 +75,7 @@ where
fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future { fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
match req { match req {
ServerMessage::Connect(stream) => { ServerMessage::Connect(stream) => {
let stream = TcpStream::from_std(stream, &Handle::default()).map_err(|e| { let stream = FromStream::from_stdstream(stream).map_err(|e| {
error!("Can not convert to an async tcp stream: {}", e); error!("Can not convert to an async tcp stream: {}", e);
}); });
@@ -93,50 +94,55 @@ where
} }
} }
pub(crate) struct StreamNewService<F: ServiceFactory> { pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> {
name: String, name: String,
inner: F, inner: F,
token: Token, token: Token,
addr: SocketAddr, addr: SocketAddr,
_t: PhantomData<Io>,
} }
impl<F> StreamNewService<F> impl<F, Io> StreamNewService<F, Io>
where where
F: ServiceFactory, F: ServiceFactory<Io>,
Io: FromStream + Send + 'static,
{ {
pub(crate) fn create( pub(crate) fn create(
name: String, name: String,
token: Token, token: Token,
inner: F, inner: F,
addr: SocketAddr, addr: SocketAddr,
) -> Box<InternalServiceFactory> { ) -> Box<dyn InternalServiceFactory> {
Box::new(Self { Box::new(Self {
name, name,
token, token,
inner, inner,
addr, addr,
_t: PhantomData,
}) })
} }
} }
impl<F> InternalServiceFactory for StreamNewService<F> impl<F, Io> InternalServiceFactory for StreamNewService<F, Io>
where where
F: ServiceFactory, F: ServiceFactory<Io>,
Io: FromStream + Send + 'static,
{ {
fn name(&self, _: Token) -> &str { fn name(&self, _: Token) -> &str {
&self.name &self.name
} }
fn clone_factory(&self) -> Box<InternalServiceFactory> { fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
Box::new(Self { Box::new(Self {
name: self.name.clone(), name: self.name.clone(),
inner: self.inner.clone(), inner: self.inner.clone(),
token: self.token, token: self.token,
addr: self.addr, addr: self.addr,
_t: PhantomData,
}) })
} }
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> { fn create(&self) -> Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
let token = self.token; let token = self.token;
let config = ServerConfig::new(self.addr); let config = ServerConfig::new(self.addr);
Box::new( Box::new(
@@ -152,24 +158,25 @@ where
} }
} }
impl InternalServiceFactory for Box<InternalServiceFactory> { impl InternalServiceFactory for Box<dyn InternalServiceFactory> {
fn name(&self, token: Token) -> &str { fn name(&self, token: Token) -> &str {
self.as_ref().name(token) self.as_ref().name(token)
} }
fn clone_factory(&self) -> Box<InternalServiceFactory> { fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
self.as_ref().clone_factory() self.as_ref().clone_factory()
} }
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> { fn create(&self) -> Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
self.as_ref().create() self.as_ref().create()
} }
} }
impl<F, T> ServiceFactory for F impl<F, T, I> ServiceFactory<I> for F
where where
F: Fn() -> T + Send + Clone + 'static, F: Fn() -> T + Send + Clone + 'static,
T: NewService<Config = ServerConfig, Request = Io<TcpStream>>, T: NewService<Config = ServerConfig, Request = Io<I>>,
I: FromStream,
{ {
type NewService = T; type NewService = T;

View File

@@ -27,7 +27,7 @@ pub(crate) struct Signals {
streams: Vec<SigStream>, streams: Vec<SigStream>,
} }
type SigStream = Box<Stream<Item = Signal, Error = io::Error>>; type SigStream = Box<dyn Stream<Item = Signal, Error = io::Error>>;
impl Signals { impl Signals {
pub(crate) fn start(srv: Server) { pub(crate) fn start(srv: Server) {
@@ -46,7 +46,7 @@ impl Signals {
{ {
use tokio_signal::unix; use tokio_signal::unix;
let mut sigs: Vec<Box<Future<Item = SigStream, Error = io::Error>>> = let mut sigs: Vec<Box<dyn Future<Item = SigStream, Error = io::Error>>> =
Vec::new(); Vec::new();
sigs.push(Box::new( sigs.push(Box::new(
tokio_signal::unix::Signal::new(tokio_signal::unix::SIGINT).map(|stream| { tokio_signal::unix::Signal::new(tokio_signal::unix::SIGINT).map(|stream| {

173
actix-server/src/socket.rs Normal file
View File

@@ -0,0 +1,173 @@
use std::{fmt, io, net};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_reactor::Handle;
use tokio_tcp::TcpStream;
pub(crate) enum StdListener {
Tcp(net::TcpListener),
#[cfg(all(unix, feature = "uds"))]
Uds(std::os::unix::net::UnixListener),
}
pub(crate) enum SocketAddr {
Tcp(net::SocketAddr),
#[cfg(all(unix, feature = "uds"))]
Uds(std::os::unix::net::SocketAddr),
}
impl fmt::Display for SocketAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
SocketAddr::Tcp(ref addr) => write!(f, "{}", addr),
#[cfg(all(unix, feature = "uds"))]
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
}
}
}
impl fmt::Debug for SocketAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr),
#[cfg(all(unix, feature = "uds"))]
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
}
}
}
impl fmt::Display for StdListener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
StdListener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()),
#[cfg(all(unix, feature = "uds"))]
StdListener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()),
}
}
}
impl StdListener {
pub(crate) fn local_addr(&self) -> SocketAddr {
match self {
StdListener::Tcp(lst) => SocketAddr::Tcp(lst.local_addr().unwrap()),
#[cfg(all(unix, feature = "uds"))]
StdListener::Uds(lst) => SocketAddr::Uds(lst.local_addr().unwrap()),
}
}
pub(crate) fn into_listener(self) -> SocketListener {
match self {
StdListener::Tcp(lst) => SocketListener::Tcp(
mio::net::TcpListener::from_std(lst)
.expect("Can not create mio::net::TcpListener"),
),
#[cfg(all(unix, feature = "uds"))]
StdListener::Uds(lst) => SocketListener::Uds(
mio_uds::UnixListener::from_listener(lst)
.expect("Can not create mio_uds::UnixListener"),
),
}
}
}
#[derive(Debug)]
pub enum StdStream {
Tcp(std::net::TcpStream),
#[cfg(all(unix, feature = "uds"))]
Uds(std::os::unix::net::UnixStream),
}
pub(crate) enum SocketListener {
Tcp(mio::net::TcpListener),
#[cfg(all(unix, feature = "uds"))]
Uds(mio_uds::UnixListener),
}
impl SocketListener {
pub(crate) fn accept(&self) -> io::Result<Option<(StdStream, SocketAddr)>> {
match *self {
SocketListener::Tcp(ref lst) => lst
.accept_std()
.map(|(stream, addr)| Some((StdStream::Tcp(stream), SocketAddr::Tcp(addr)))),
#[cfg(all(unix, feature = "uds"))]
SocketListener::Uds(ref lst) => lst.accept_std().map(|res| {
res.map(|(stream, addr)| (StdStream::Uds(stream), SocketAddr::Uds(addr)))
}),
}
}
}
impl mio::Evented for SocketListener {
fn register(
&self,
poll: &mio::Poll,
token: mio::Token,
interest: mio::Ready,
opts: mio::PollOpt,
) -> io::Result<()> {
match *self {
SocketListener::Tcp(ref lst) => lst.register(poll, token, interest, opts),
#[cfg(all(unix, feature = "uds"))]
SocketListener::Uds(ref lst) => lst.register(poll, token, interest, opts),
}
}
fn reregister(
&self,
poll: &mio::Poll,
token: mio::Token,
interest: mio::Ready,
opts: mio::PollOpt,
) -> io::Result<()> {
match *self {
SocketListener::Tcp(ref lst) => lst.reregister(poll, token, interest, opts),
#[cfg(all(unix, feature = "uds"))]
SocketListener::Uds(ref lst) => lst.reregister(poll, token, interest, opts),
}
}
fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
match *self {
SocketListener::Tcp(ref lst) => lst.deregister(poll),
#[cfg(all(unix, feature = "uds"))]
SocketListener::Uds(ref lst) => {
let res = lst.deregister(poll);
// cleanup file path
if let Ok(addr) = lst.local_addr() {
if let Some(path) = addr.as_pathname() {
let _ = std::fs::remove_file(path);
}
}
res
}
}
}
}
pub trait FromStream: AsyncRead + AsyncWrite + Sized {
fn from_stdstream(sock: StdStream) -> io::Result<Self>;
}
impl FromStream for TcpStream {
fn from_stdstream(sock: StdStream) -> io::Result<Self> {
match sock {
StdStream::Tcp(stream) => TcpStream::from_std(stream, &Handle::default()),
#[cfg(all(unix, feature = "uds"))]
StdStream::Uds(_) => {
panic!("Should not happen, bug in server impl");
}
}
}
}
#[cfg(all(unix, feature = "uds"))]
impl FromStream for tokio_uds::UnixStream {
fn from_stdstream(sock: StdStream) -> io::Result<Self> {
match sock {
StdStream::Tcp(_) => panic!("Should not happen, bug in server impl"),
StdStream::Uds(stream) => {
tokio_uds::UnixStream::from_std(stream, &Handle::default())
}
}
}
}

View File

@@ -1,6 +1,6 @@
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::{mem, net, time}; use std::{mem, time};
use actix_rt::{spawn, Arbiter}; use actix_rt::{spawn, Arbiter};
use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender};
@@ -12,6 +12,7 @@ use tokio_timer::{sleep, Delay};
use crate::accept::AcceptNotify; use crate::accept::AcceptNotify;
use crate::counter::Counter; use crate::counter::Counter;
use crate::services::{BoxedServerService, InternalServiceFactory, ServerMessage}; use crate::services::{BoxedServerService, InternalServiceFactory, ServerMessage};
use crate::socket::{SocketAddr, StdStream};
use crate::Token; use crate::Token;
pub(crate) struct WorkerCommand(Conn); pub(crate) struct WorkerCommand(Conn);
@@ -25,9 +26,9 @@ pub(crate) struct StopCommand {
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Conn { pub(crate) struct Conn {
pub io: net::TcpStream, pub io: StdStream,
pub token: Token, pub token: Token,
pub peer: Option<net::SocketAddr>, pub peer: Option<SocketAddr>,
} }
static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600); static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600);
@@ -127,7 +128,7 @@ pub(crate) struct Worker {
services: Vec<Option<(usize, BoxedServerService)>>, services: Vec<Option<(usize, BoxedServerService)>>,
availability: WorkerAvailability, availability: WorkerAvailability,
conns: Counter, conns: Counter,
factories: Vec<Box<InternalServiceFactory>>, factories: Vec<Box<dyn InternalServiceFactory>>,
state: WorkerState, state: WorkerState,
shutdown_timeout: time::Duration, shutdown_timeout: time::Duration,
} }
@@ -136,7 +137,7 @@ impl Worker {
pub(crate) fn start( pub(crate) fn start(
rx: UnboundedReceiver<WorkerCommand>, rx: UnboundedReceiver<WorkerCommand>,
rx2: UnboundedReceiver<StopCommand>, rx2: UnboundedReceiver<StopCommand>,
factories: Vec<Box<InternalServiceFactory>>, factories: Vec<Box<dyn InternalServiceFactory>>,
availability: WorkerAvailability, availability: WorkerAvailability,
shutdown_timeout: time::Duration, shutdown_timeout: time::Duration,
) { ) {
@@ -237,7 +238,7 @@ enum WorkerState {
Restarting( Restarting(
usize, usize,
Token, Token,
Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>>, Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>>,
), ),
Shutdown(Delay, Delay, oneshot::Sender<bool>), Shutdown(Delay, Delay, oneshot::Sender<bool>),
} }

View File

@@ -1,5 +1,12 @@
# Changes # Changes
## [0.4.2] - 2019-08-27
### Fixed
* Check service readiness for `new_apply_cfg` combinator
## [0.4.1] - 2019-06-06 ## [0.4.1] - 2019-06-06
### Added ### Added

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "actix-service" name = "actix-service"
version = "0.4.1" version = "0.4.2"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix Service" description = "Actix Service"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View File

@@ -49,6 +49,7 @@ where
C: Clone, C: Clone,
F: FnMut(&C, &mut T::Service) -> R, F: FnMut(&C, &mut T::Service) -> R,
T: NewService<Config = ()>, T: NewService<Config = ()>,
T::InitError: From<T::Error>,
R: IntoFuture<Error = T::InitError>, R: IntoFuture<Error = T::InitError>,
R::Item: IntoService<S>, R::Item: IntoService<S>,
S: Service, S: Service,
@@ -179,6 +180,7 @@ where
C: Clone, C: Clone,
F: FnMut(&C, &mut T::Service) -> R, F: FnMut(&C, &mut T::Service) -> R,
T: NewService<Config = ()>, T: NewService<Config = ()>,
T::InitError: From<T::Error>,
R: IntoFuture<Error = T::InitError>, R: IntoFuture<Error = T::InitError>,
R::Item: IntoService<S>, R::Item: IntoService<S>,
S: Service, S: Service,
@@ -196,8 +198,9 @@ where
ApplyConfigNewServiceFut { ApplyConfigNewServiceFut {
f: self.f.clone(), f: self.f.clone(),
cfg: cfg.clone(), cfg: cfg.clone(),
srv: Some(self.srv.get_ref().new_service(&())),
fut: None, fut: None,
srv: None,
srv_fut: Some(self.srv.get_ref().new_service(&())),
_t: PhantomData, _t: PhantomData,
} }
} }
@@ -208,13 +211,15 @@ where
C: Clone, C: Clone,
F: FnMut(&C, &mut T::Service) -> R, F: FnMut(&C, &mut T::Service) -> R,
T: NewService<Config = ()>, T: NewService<Config = ()>,
T::InitError: From<T::Error>,
R: IntoFuture<Error = T::InitError>, R: IntoFuture<Error = T::InitError>,
R::Item: IntoService<S>, R::Item: IntoService<S>,
S: Service, S: Service,
{ {
cfg: C, cfg: C,
f: Cell<F>, f: Cell<F>,
srv: Option<T::Future>, srv: Option<T::Service>,
srv_fut: Option<T::Future>,
fut: Option<R::Future>, fut: Option<R::Future>,
_t: PhantomData<(S,)>, _t: PhantomData<(S,)>,
} }
@@ -224,6 +229,7 @@ where
C: Clone, C: Clone,
F: FnMut(&C, &mut T::Service) -> R, F: FnMut(&C, &mut T::Service) -> R,
T: NewService<Config = ()>, T: NewService<Config = ()>,
T::InitError: From<T::Error>,
R: IntoFuture<Error = T::InitError>, R: IntoFuture<Error = T::InitError>,
R::Item: IntoService<S>, R::Item: IntoService<S>,
S: Service, S: Service,
@@ -232,12 +238,12 @@ where
type Error = R::Error; type Error = R::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.srv { if let Some(ref mut fut) = self.srv_fut {
match fut.poll()? { match fut.poll()? {
Async::NotReady => return Ok(Async::NotReady), Async::NotReady => return Ok(Async::NotReady),
Async::Ready(mut srv) => { Async::Ready(srv) => {
let _ = self.srv.take(); let _ = self.srv_fut.take();
self.fut = Some(self.f.get_mut()(&self.cfg, &mut srv).into_future()); self.srv = Some(srv);
return self.poll(); return self.poll();
} }
} }
@@ -245,6 +251,14 @@ where
if let Some(ref mut fut) = self.fut { if let Some(ref mut fut) = self.fut {
Ok(Async::Ready(try_ready!(fut.poll()).into_service())) Ok(Async::Ready(try_ready!(fut.poll()).into_service()))
} else if let Some(ref mut srv) = self.srv {
match srv.poll_ready()? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready(_) => {
self.fut = Some(self.f.get_mut()(&self.cfg, srv).into_future());
return self.poll();
}
}
} else { } else {
Ok(Async::NotReady) Ok(Async::NotReady)
} }

View File

@@ -4,7 +4,7 @@ use futures::{Async, Future, IntoFuture, Poll};
use crate::{NewService, Service}; use crate::{NewService, Service};
pub type BoxedService<Req, Res, Err> = Box< pub type BoxedService<Req, Res, Err> = Box<
Service< dyn Service<
Request = Req, Request = Req,
Response = Res, Response = Res,
Error = Err, Error = Err,
@@ -13,7 +13,7 @@ pub type BoxedService<Req, Res, Err> = Box<
>; >;
pub type BoxedServiceResponse<Res, Err> = pub type BoxedServiceResponse<Res, Err> =
Either<FutureResult<Res, Err>, Box<Future<Item = Res, Error = Err>>>; Either<FutureResult<Res, Err>, Box<dyn Future<Item = Res, Error = Err>>>;
pub struct BoxedNewService<C, Req, Res, Err, InitErr>(Inner<C, Req, Res, Err, InitErr>); pub struct BoxedNewService<C, Req, Res, Err, InitErr>(Inner<C, Req, Res, Err, InitErr>);
@@ -46,14 +46,14 @@ where
} }
type Inner<C, Req, Res, Err, InitErr> = Box< type Inner<C, Req, Res, Err, InitErr> = Box<
NewService< dyn NewService<
Config = C, Config = C,
Request = Req, Request = Req,
Response = Res, Response = Res,
Error = Err, Error = Err,
InitError = InitErr, InitError = InitErr,
Service = BoxedService<Req, Res, Err>, Service = BoxedService<Req, Res, Err>,
Future = Box<Future<Item = BoxedService<Req, Res, Err>, Error = InitErr>>, Future = Box<dyn Future<Item = BoxedService<Req, Res, Err>, Error = InitErr>>,
>, >,
>; >;
@@ -70,7 +70,7 @@ where
type InitError = InitErr; type InitError = InitErr;
type Config = C; type Config = C;
type Service = BoxedService<Req, Res, Err>; type Service = BoxedService<Req, Res, Err>;
type Future = Box<Future<Item = Self::Service, Error = Self::InitError>>; type Future = Box<dyn Future<Item = Self::Service, Error = Self::InitError>>;
fn new_service(&self, cfg: &C) -> Self::Future { fn new_service(&self, cfg: &C) -> Self::Future {
self.0.new_service(cfg) self.0.new_service(cfg)
@@ -99,7 +99,7 @@ where
type InitError = InitErr; type InitError = InitErr;
type Config = C; type Config = C;
type Service = BoxedService<Req, Res, Err>; type Service = BoxedService<Req, Res, Err>;
type Future = Box<Future<Item = Self::Service, Error = Self::InitError>>; type Future = Box<dyn Future<Item = Self::Service, Error = Self::InitError>>;
fn new_service(&self, cfg: &C) -> Self::Future { fn new_service(&self, cfg: &C) -> Self::Future {
Box::new( Box::new(
@@ -133,7 +133,7 @@ where
type Error = Err; type Error = Err;
type Future = Either< type Future = Either<
FutureResult<Self::Response, Self::Error>, FutureResult<Self::Response, Self::Error>,
Box<Future<Item = Self::Response, Error = Self::Error>>, Box<dyn Future<Item = Self::Response, Error = Self::Error>>,
>; >;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {

View File

@@ -34,6 +34,7 @@ impl<T> Cell<T> {
unsafe { &mut *self.inner.as_ref().get() } unsafe { &mut *self.inner.as_ref().get() }
} }
#[allow(clippy::mut_from_ref)]
pub(crate) unsafe fn get_mut_unsafe(&self) -> &mut T { pub(crate) unsafe fn get_mut_unsafe(&self) -> &mut T {
&mut *self.inner.as_ref().get() &mut *self.inner.as_ref().get()
} }

View File

@@ -176,7 +176,7 @@ impl<T: ?Sized> ServiceExt for T where T: Service {}
/// Creates new `Service` values. /// Creates new `Service` values.
/// ///
/// Acts as a service factory. This is useful for cases where new `Service` /// Acts as a service factory. This is useful for cases where new `Service`
/// values must be produced. One case is a TCP servier listener. The listner /// values must be produced. One case is a TCP server listener. The listener
/// accepts new TCP streams, obtains a new `Service` value using the /// accepts new TCP streams, obtains a new `Service` value using the
/// `NewService` trait, and uses that new `Service` value to process inbound /// `NewService` trait, and uses that new `Service` value to process inbound
/// requests on that new TCP stream. /// requests on that new TCP stream.

View File

@@ -1,9 +1,13 @@
# Changes # Changes
## [0.1.2] - 2019-08-05
### Changed ### Changed
* Update `derive_more` to 0.15 * Update `derive_more` to 0.15
* Update `parking_lot` to 0.9
## [0.1.1] - 2019-06-05 ## [0.1.1] - 2019-06-05
* Update parking_lot * Update parking_lot

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "actix-threadpool" name = "actix-threadpool"
version = "0.1.1" version = "0.1.2"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix thread pool for sync code" description = "Actix thread pool for sync code"
keywords = ["actix", "network", "framework", "async", "futures"] keywords = ["actix", "network", "framework", "async", "futures"]
@@ -20,7 +20,7 @@ path = "src/lib.rs"
[dependencies] [dependencies]
derive_more = "0.15" derive_more = "0.15"
futures = "0.1.25" futures = "0.1.25"
parking_lot = "0.8" parking_lot = "0.9"
lazy_static = "1.2" lazy_static = "1.2"
log = "0.4" log = "0.4"
num_cpus = "1.10" num_cpus = "1.10"

View File

@@ -1,5 +1,28 @@
# Changes # Changes
## [0.4.5] - 2019-07-19
### Removed
* Deprecated `CloneableService` as it is not safe
## [0.4.4] - 2019-07-17
### Changed
* Undeprecate `FramedTransport` as it is actually useful
## [0.4.3] - 2019-07-17
### Deprecated
* Deprecate `CloneableService` as it is not safe and in general not very useful
* Deprecate `FramedTransport` in favor of `actix-ioframe`
## [0.4.2] - 2019-06-26 ## [0.4.2] - 2019-06-26
### Fixed ### Fixed

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "actix-utils" name = "actix-utils"
version = "0.4.2" version = "0.4.5"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix utils - various actix net related services" description = "Actix utils - various actix net related services"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View File

@@ -1,52 +0,0 @@
use std::marker::PhantomData;
use std::rc::Rc;
use actix_service::Service;
use futures::Poll;
use super::cell::Cell;
/// Service that allows to turn non-clone service to a service with `Clone` impl
pub struct CloneableService<T> {
service: Cell<T>,
_t: PhantomData<Rc<()>>,
}
impl<T> CloneableService<T> {
pub fn new(service: T) -> Self
where
T: Service,
{
Self {
service: Cell::new(service),
_t: PhantomData,
}
}
}
impl<T> Clone for CloneableService<T> {
fn clone(&self) -> Self {
Self {
service: self.service.clone(),
_t: PhantomData,
}
}
}
impl<T> Service for CloneableService<T>
where
T: Service,
{
type Request = T::Request;
type Response = T::Response;
type Error = T::Error;
type Future = T::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.get_mut().poll_ready()
}
fn call(&mut self, req: T::Request) -> Self::Future {
self.service.get_mut().call(req)
}
}

View File

@@ -119,7 +119,7 @@ mod tests {
type Request = (); type Request = ();
type Response = (); type Response = ();
type Error = (); type Error = ();
type Future = Box<Future<Item = (), Error = ()>>; type Future = Box<dyn Future<Item = (), Error = ()>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(())) Ok(Async::Ready(()))

View File

@@ -1,6 +1,6 @@
//! Actix utils - various helper services //! Actix utils - various helper services
mod cell; mod cell;
pub mod cloneable;
pub mod counter; pub mod counter;
pub mod either; pub mod either;
pub mod framed; pub mod framed;

View File

@@ -214,7 +214,7 @@ mod tests {
type Request = oneshot::Receiver<usize>; type Request = oneshot::Receiver<usize>;
type Response = usize; type Response = usize;
type Error = (); type Error = ();
type Future = Box<Future<Item = usize, Error = ()>>; type Future = Box<dyn Future<Item = usize, Error = ()>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(())) Ok(Async::Ready(()))

View File

@@ -45,7 +45,7 @@ where
type Request = S; type Request = S;
type Response = (); type Response = ();
type Error = E; type Error = E;
type Future = Box<Future<Item = (), Error = E>>; type Future = Box<dyn Future<Item = (), Error = E>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(())) Ok(Async::Ready(()))

View File

@@ -190,7 +190,7 @@ mod tests {
type Request = (); type Request = ();
type Response = (); type Response = ();
type Error = (); type Error = ();
type Future = Box<Future<Item = (), Error = ()>>; type Future = Box<dyn Future<Item = (), Error = ()>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(())) Ok(Async::Ready(()))

View File

@@ -151,7 +151,7 @@ impl<'de, T: ResourcePath + 'de> Deserializer<'de> for PathDeserializer<'de, T>
where where
V: Visitor<'de>, V: Visitor<'de>,
{ {
if self.path.len() < 1 { if self.path.is_empty() {
Err(de::value::Error::custom( Err(de::value::Error::custom(
"expeceted at least one parameters", "expeceted at least one parameters",
)) ))

View File

@@ -93,7 +93,7 @@ impl<T: ResourcePath> Path<T> {
#[inline] #[inline]
/// Skip first `n` chars in path /// Skip first `n` chars in path
pub fn skip(&mut self, n: u16) { pub fn skip(&mut self, n: u16) {
self.skip = self.skip + n; self.skip += n;
} }
pub(crate) fn add(&mut self, name: Rc<String>, value: PathItem) { pub(crate) fn add(&mut self, name: Rc<String>, value: PathItem) {

View File

@@ -207,7 +207,7 @@ impl ResourceDef {
"Dynamic path match but not all segments found: {}", "Dynamic path match but not all segments found: {}",
name name
); );
false; return false;
} }
} }
} else { } else {
@@ -279,7 +279,7 @@ impl ResourceDef {
"Dynamic path match but not all segments found: {}", "Dynamic path match but not all segments found: {}",
name name
); );
false; return false;
} }
} }
} else { } else {