mirror of
https://github.com/fafhrd91/actix-net
synced 2025-08-12 12:17:08 +02:00
Compare commits
16 Commits
server-v0.
...
connect-v0
Author | SHA1 | Date | |
---|---|---|---|
|
b1d9b06a87 | ||
|
94e673b50b | ||
|
1a644c6bb1 | ||
|
aad013f559 | ||
|
7a18d9da26 | ||
|
d59b8ce62e | ||
|
3821d511d0 | ||
|
62e429cb0c | ||
|
a2643d475a | ||
|
34c259a8b5 | ||
|
8b398c3386 | ||
|
0baceb0e56 | ||
|
6be1f37f6c | ||
|
a742768feb | ||
|
f913872159 | ||
|
41145040e1 |
@@ -10,6 +10,7 @@ matrix:
|
||||
include:
|
||||
- rust: stable
|
||||
- rust: beta
|
||||
- rust: 1.36.0
|
||||
- rust: nightly-2019-06-15
|
||||
allow_failures:
|
||||
- rust: nightly-2019-06-15
|
||||
|
@@ -2,7 +2,7 @@
|
||||
name = "actix-net"
|
||||
version = "0.3.0"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix net - framework for the compisible network services for Rust"
|
||||
description = "Actix net - framework for the composable network services for Rust"
|
||||
readme = "README.md"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
homepage = "https://actix.rs"
|
||||
|
@@ -7,7 +7,7 @@ Actix net - framework for composable network services
|
||||
* [API Documentation (Development)](https://actix.rs/actix-net/actix_net/)
|
||||
* [Chat on gitter](https://gitter.im/actix/actix)
|
||||
* Cargo package: [actix-net](https://crates.io/crates/actix-net)
|
||||
* Minimum supported Rust version: 1.32 or later
|
||||
* Minimum supported Rust version: 1.36 or later
|
||||
|
||||
## Example
|
||||
|
||||
|
@@ -1,5 +1,17 @@
|
||||
# Changes
|
||||
|
||||
## [0.2.4] - 2019-09-02
|
||||
|
||||
* Use arbiter's storage for default async resolver
|
||||
|
||||
## [0.2.3] - 2019-08-05
|
||||
|
||||
* Add `ConnectService` and `OpensslConnectService`
|
||||
|
||||
## [0.2.2] - 2019-07-24
|
||||
|
||||
* Add `rustls` support
|
||||
|
||||
## [0.2.1] - 2019-07-17
|
||||
|
||||
### Added
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-connect"
|
||||
version = "0.2.1"
|
||||
version = "0.2.4"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix Connector - tcp connector service"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
@@ -26,6 +26,9 @@ default = ["uri"]
|
||||
# openssl
|
||||
ssl = ["openssl", "tokio-openssl"]
|
||||
|
||||
#rustls
|
||||
rust-tls = ["rustls", "tokio-rustls", "webpki"]
|
||||
|
||||
# support http::Uri as connect address
|
||||
uri = ["http"]
|
||||
|
||||
@@ -33,6 +36,7 @@ uri = ["http"]
|
||||
actix-service = "0.4.0"
|
||||
actix-codec = "0.1.2"
|
||||
actix-utils = "0.4.0"
|
||||
actix-rt = "0.2.5"
|
||||
derive_more = "0.15"
|
||||
either = "1.5.2"
|
||||
futures = "0.1.25"
|
||||
@@ -46,6 +50,11 @@ trust-dns-resolver = { version="0.11.0", default-features = false }
|
||||
openssl = { version="0.10", optional = true }
|
||||
tokio-openssl = { version="0.3", optional = true }
|
||||
|
||||
#rustls
|
||||
rustls = { version = "0.15.2", optional = true }
|
||||
tokio-rustls = { version = "0.9.1", optional = true }
|
||||
webpki = { version = "0.19", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
bytes = "0.4"
|
||||
actix-test-server = { version="0.2.2", features=["ssl"] }
|
||||
|
@@ -18,6 +18,17 @@ impl<T> TcpConnectorFactory<T> {
|
||||
pub fn new() -> Self {
|
||||
TcpConnectorFactory(PhantomData)
|
||||
}
|
||||
|
||||
/// Create tcp connector service
|
||||
pub fn service(&self) -> TcpConnector<T> {
|
||||
TcpConnector(PhantomData)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Default for TcpConnectorFactory<T> {
|
||||
fn default() -> Self {
|
||||
TcpConnectorFactory(PhantomData)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Clone for TcpConnectorFactory<T> {
|
||||
@@ -36,12 +47,12 @@ impl<T: Address> NewService for TcpConnectorFactory<T> {
|
||||
type Future = FutureResult<Self::Service, Self::InitError>;
|
||||
|
||||
fn new_service(&self, _: &()) -> Self::Future {
|
||||
ok(TcpConnector(PhantomData))
|
||||
ok(self.service())
|
||||
}
|
||||
}
|
||||
|
||||
/// Tcp connector service
|
||||
#[derive(Debug)]
|
||||
#[derive(Default, Debug)]
|
||||
pub struct TcpConnector<T>(PhantomData<T>);
|
||||
|
||||
impl<T> TcpConnector<T> {
|
||||
|
@@ -10,12 +10,11 @@
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
|
||||
use std::cell::RefCell;
|
||||
|
||||
mod connect;
|
||||
mod connector;
|
||||
mod error;
|
||||
mod resolver;
|
||||
mod service;
|
||||
pub mod ssl;
|
||||
|
||||
#[cfg(feature = "uri")]
|
||||
@@ -29,7 +28,9 @@ pub use self::connect::{Address, Connect, Connection};
|
||||
pub use self::connector::{TcpConnector, TcpConnectorFactory};
|
||||
pub use self::error::ConnectError;
|
||||
pub use self::resolver::{Resolver, ResolverFactory};
|
||||
pub use self::service::{ConnectService, ConnectServiceFactory};
|
||||
|
||||
use actix_rt::Arbiter;
|
||||
use actix_service::{NewService, Service, ServiceExt};
|
||||
use tokio_tcp::TcpStream;
|
||||
|
||||
@@ -39,16 +40,12 @@ pub fn start_resolver(cfg: ResolverConfig, opts: ResolverOpts) -> AsyncResolver
|
||||
resolver
|
||||
}
|
||||
|
||||
thread_local! {
|
||||
static DEFAULT_RESOLVER: RefCell<Option<AsyncResolver>> = RefCell::new(None);
|
||||
}
|
||||
struct DefaultResolver(AsyncResolver);
|
||||
|
||||
pub(crate) fn get_default_resolver() -> AsyncResolver {
|
||||
DEFAULT_RESOLVER.with(|cell| {
|
||||
if let Some(ref resolver) = *cell.borrow() {
|
||||
return resolver.clone();
|
||||
}
|
||||
|
||||
if Arbiter::contains_item::<DefaultResolver>() {
|
||||
return Arbiter::get_item(|item: &DefaultResolver| item.0.clone());
|
||||
} else {
|
||||
let (cfg, opts) = match read_system_conf() {
|
||||
Ok((cfg, opts)) => (cfg, opts),
|
||||
Err(e) => {
|
||||
@@ -60,9 +57,9 @@ pub(crate) fn get_default_resolver() -> AsyncResolver {
|
||||
let (resolver, bg) = AsyncResolver::new(cfg, opts);
|
||||
tokio_current_thread::spawn(bg);
|
||||
|
||||
*cell.borrow_mut() = Some(resolver.clone());
|
||||
Arbiter::set_item(DefaultResolver(resolver.clone()));
|
||||
resolver
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start_default_resolver() -> AsyncResolver {
|
||||
|
@@ -25,6 +25,13 @@ impl<T> ResolverFactory<T> {
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn service(&self) -> Resolver<T> {
|
||||
Resolver {
|
||||
resolver: self.resolver.clone(),
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Default for ResolverFactory<T> {
|
||||
@@ -55,10 +62,7 @@ impl<T: Address> NewService for ResolverFactory<T> {
|
||||
type Future = FutureResult<Self::Service, Self::InitError>;
|
||||
|
||||
fn new_service(&self, _: &()) -> Self::Future {
|
||||
ok(Resolver {
|
||||
resolver: self.resolver.clone(),
|
||||
_t: PhantomData,
|
||||
})
|
||||
ok(self.service())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -109,17 +113,15 @@ impl<T: Address> Service for Resolver<T> {
|
||||
fn call(&mut self, mut req: Connect<T>) -> Self::Future {
|
||||
if req.addr.is_some() {
|
||||
Either::B(ok(req))
|
||||
} else if let Ok(ip) = req.host().parse() {
|
||||
req.addr = Some(either::Either::Left(SocketAddr::new(ip, req.port())));
|
||||
Either::B(ok(req))
|
||||
} else {
|
||||
if let Ok(ip) = req.host().parse() {
|
||||
req.addr = Some(either::Either::Left(SocketAddr::new(ip, req.port())));
|
||||
Either::B(ok(req))
|
||||
} else {
|
||||
trace!("DNS resolver: resolving host {:?}", req.host());
|
||||
if self.resolver.is_none() {
|
||||
self.resolver = Some(get_default_resolver());
|
||||
}
|
||||
Either::A(ResolverFuture::new(req, self.resolver.as_ref().unwrap()))
|
||||
trace!("DNS resolver: resolving host {:?}", req.host());
|
||||
if self.resolver.is_none() {
|
||||
self.resolver = Some(get_default_resolver());
|
||||
}
|
||||
Either::A(ResolverFuture::new(req, self.resolver.as_ref().unwrap()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
123
actix-connect/src/service.rs
Normal file
123
actix-connect/src/service.rs
Normal file
@@ -0,0 +1,123 @@
|
||||
use actix_service::{NewService, Service};
|
||||
use futures::future::{ok, FutureResult};
|
||||
use futures::{try_ready, Async, Future, Poll};
|
||||
use tokio_tcp::TcpStream;
|
||||
use trust_dns_resolver::AsyncResolver;
|
||||
|
||||
use crate::connect::{Address, Connect, Connection};
|
||||
use crate::connector::{TcpConnector, TcpConnectorFactory};
|
||||
use crate::error::ConnectError;
|
||||
use crate::resolver::{Resolver, ResolverFactory};
|
||||
|
||||
pub struct ConnectServiceFactory<T> {
|
||||
tcp: TcpConnectorFactory<T>,
|
||||
resolver: ResolverFactory<T>,
|
||||
}
|
||||
|
||||
impl<T> ConnectServiceFactory<T> {
|
||||
/// Construct new ConnectService factory
|
||||
pub fn new() -> Self {
|
||||
ConnectServiceFactory {
|
||||
tcp: TcpConnectorFactory::default(),
|
||||
resolver: ResolverFactory::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct new connect service with custom dns resolver
|
||||
pub fn with_resolver(resolver: AsyncResolver) -> Self {
|
||||
ConnectServiceFactory {
|
||||
tcp: TcpConnectorFactory::default(),
|
||||
resolver: ResolverFactory::new(resolver),
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct new service
|
||||
pub fn service(&self) -> ConnectService<T> {
|
||||
ConnectService {
|
||||
tcp: self.tcp.service(),
|
||||
resolver: self.resolver.service(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Default for ConnectServiceFactory<T> {
|
||||
fn default() -> Self {
|
||||
ConnectServiceFactory {
|
||||
tcp: TcpConnectorFactory::default(),
|
||||
resolver: ResolverFactory::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Clone for ConnectServiceFactory<T> {
|
||||
fn clone(&self) -> Self {
|
||||
ConnectServiceFactory {
|
||||
tcp: self.tcp.clone(),
|
||||
resolver: self.resolver.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Address> NewService for ConnectServiceFactory<T> {
|
||||
type Request = Connect<T>;
|
||||
type Response = Connection<T, TcpStream>;
|
||||
type Error = ConnectError;
|
||||
type Config = ();
|
||||
type Service = ConnectService<T>;
|
||||
type InitError = ();
|
||||
type Future = FutureResult<Self::Service, Self::InitError>;
|
||||
|
||||
fn new_service(&self, _: &()) -> Self::Future {
|
||||
ok(self.service())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ConnectService<T> {
|
||||
tcp: TcpConnector<T>,
|
||||
resolver: Resolver<T>,
|
||||
}
|
||||
|
||||
impl<T: Address> Service for ConnectService<T> {
|
||||
type Request = Connect<T>;
|
||||
type Response = Connection<T, TcpStream>;
|
||||
type Error = ConnectError;
|
||||
type Future = ConnectServiceResponse<T>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Connect<T>) -> Self::Future {
|
||||
ConnectServiceResponse {
|
||||
fut1: Some(self.resolver.call(req)),
|
||||
fut2: None,
|
||||
tcp: self.tcp.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ConnectServiceResponse<T: Address> {
|
||||
fut1: Option<<Resolver<T> as Service>::Future>,
|
||||
fut2: Option<<TcpConnector<T> as Service>::Future>,
|
||||
tcp: TcpConnector<T>,
|
||||
}
|
||||
|
||||
impl<T: Address> Future for ConnectServiceResponse<T> {
|
||||
type Item = Connection<T, TcpStream>;
|
||||
type Error = ConnectError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
if let Some(ref mut fut) = self.fut1 {
|
||||
let res = try_ready!(fut.poll());
|
||||
let _ = self.fut1.take();
|
||||
self.fut2 = Some(self.tcp.call(res));
|
||||
}
|
||||
|
||||
if let Some(ref mut fut) = self.fut2 {
|
||||
return fut.poll();
|
||||
}
|
||||
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
}
|
@@ -3,4 +3,10 @@
|
||||
#[cfg(feature = "ssl")]
|
||||
mod openssl;
|
||||
#[cfg(feature = "ssl")]
|
||||
pub use self::openssl::OpensslConnector;
|
||||
pub use self::openssl::{
|
||||
OpensslConnectService, OpensslConnectServiceFactory, OpensslConnector,
|
||||
};
|
||||
#[cfg(feature = "rust-tls")]
|
||||
mod rustls;
|
||||
#[cfg(feature = "rust-tls")]
|
||||
pub use self::rustls::RustlsConnector;
|
||||
|
@@ -1,13 +1,17 @@
|
||||
use std::fmt;
|
||||
use std::marker::PhantomData;
|
||||
use std::{fmt, io};
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite};
|
||||
use actix_service::{NewService, Service};
|
||||
use futures::{future::ok, future::FutureResult, Async, Future, Poll};
|
||||
use futures::{future::ok, future::FutureResult, try_ready, Async, Future, Poll};
|
||||
use openssl::ssl::{HandshakeError, SslConnector};
|
||||
use tokio_openssl::{ConnectAsync, SslConnectorExt, SslStream};
|
||||
use tokio_tcp::TcpStream;
|
||||
use trust_dns_resolver::AsyncResolver;
|
||||
|
||||
use crate::{Address, Connection};
|
||||
use crate::{
|
||||
Address, Connect, ConnectError, ConnectService, ConnectServiceFactory, Connection,
|
||||
};
|
||||
|
||||
/// Openssl connector factory
|
||||
pub struct OpensslConnector<T, U> {
|
||||
@@ -77,6 +81,15 @@ pub struct OpensslConnectorService<T, U> {
|
||||
_t: PhantomData<(T, U)>,
|
||||
}
|
||||
|
||||
impl<T, U> Clone for OpensslConnectorService<T, U> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
connector: self.connector.clone(),
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Address, U> Service for OpensslConnectorService<T, U>
|
||||
where
|
||||
U: AsyncRead + AsyncWrite + fmt::Debug,
|
||||
@@ -126,3 +139,113 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct OpensslConnectServiceFactory<T> {
|
||||
tcp: ConnectServiceFactory<T>,
|
||||
openssl: OpensslConnector<T, TcpStream>,
|
||||
}
|
||||
|
||||
impl<T> OpensslConnectServiceFactory<T> {
|
||||
/// Construct new OpensslConnectService factory
|
||||
pub fn new(connector: SslConnector) -> Self {
|
||||
OpensslConnectServiceFactory {
|
||||
tcp: ConnectServiceFactory::default(),
|
||||
openssl: OpensslConnector::new(connector),
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct new connect service with custom dns resolver
|
||||
pub fn with_resolver(connector: SslConnector, resolver: AsyncResolver) -> Self {
|
||||
OpensslConnectServiceFactory {
|
||||
tcp: ConnectServiceFactory::with_resolver(resolver),
|
||||
openssl: OpensslConnector::new(connector),
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct openssl connect service
|
||||
pub fn service(&self) -> OpensslConnectService<T> {
|
||||
OpensslConnectService {
|
||||
tcp: self.tcp.service(),
|
||||
openssl: OpensslConnectorService {
|
||||
connector: self.openssl.connector.clone(),
|
||||
_t: PhantomData,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Clone for OpensslConnectServiceFactory<T> {
|
||||
fn clone(&self) -> Self {
|
||||
OpensslConnectServiceFactory {
|
||||
tcp: self.tcp.clone(),
|
||||
openssl: self.openssl.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Address> NewService for OpensslConnectServiceFactory<T> {
|
||||
type Request = Connect<T>;
|
||||
type Response = SslStream<TcpStream>;
|
||||
type Error = ConnectError;
|
||||
type Config = ();
|
||||
type Service = OpensslConnectService<T>;
|
||||
type InitError = ();
|
||||
type Future = FutureResult<Self::Service, Self::InitError>;
|
||||
|
||||
fn new_service(&self, _: &()) -> Self::Future {
|
||||
ok(self.service())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct OpensslConnectService<T> {
|
||||
tcp: ConnectService<T>,
|
||||
openssl: OpensslConnectorService<T, TcpStream>,
|
||||
}
|
||||
|
||||
impl<T: Address> Service for OpensslConnectService<T> {
|
||||
type Request = Connect<T>;
|
||||
type Response = SslStream<TcpStream>;
|
||||
type Error = ConnectError;
|
||||
type Future = OpensslConnectServiceResponse<T>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Connect<T>) -> Self::Future {
|
||||
OpensslConnectServiceResponse {
|
||||
fut1: Some(self.tcp.call(req)),
|
||||
fut2: None,
|
||||
openssl: self.openssl.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct OpensslConnectServiceResponse<T: Address> {
|
||||
fut1: Option<<ConnectService<T> as Service>::Future>,
|
||||
fut2: Option<<OpensslConnectorService<T, TcpStream> as Service>::Future>,
|
||||
openssl: OpensslConnectorService<T, TcpStream>,
|
||||
}
|
||||
|
||||
impl<T: Address> Future for OpensslConnectServiceResponse<T> {
|
||||
type Item = SslStream<TcpStream>;
|
||||
type Error = ConnectError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
if let Some(ref mut fut) = self.fut1 {
|
||||
let res = try_ready!(fut.poll());
|
||||
let _ = self.fut1.take();
|
||||
self.fut2 = Some(self.openssl.call(res));
|
||||
}
|
||||
|
||||
if let Some(ref mut fut) = self.fut2 {
|
||||
let connect = try_ready!(fut
|
||||
.poll()
|
||||
.map_err(|e| ConnectError::Io(io::Error::new(io::ErrorKind::Other, e))));
|
||||
Ok(Async::Ready(connect.into_parts().0))
|
||||
} else {
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
133
actix-connect/src/ssl/rustls.rs
Normal file
133
actix-connect/src/ssl/rustls.rs
Normal file
@@ -0,0 +1,133 @@
|
||||
use std::fmt;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite};
|
||||
use actix_service::{NewService, Service};
|
||||
use futures::{future::ok, future::FutureResult, Async, Future, Poll};
|
||||
use std::sync::Arc;
|
||||
use tokio_rustls::{
|
||||
rustls::{ClientConfig, ClientSession},
|
||||
Connect, TlsConnector, TlsStream,
|
||||
};
|
||||
use webpki::DNSNameRef;
|
||||
|
||||
use crate::{Address, Connection};
|
||||
|
||||
/// Rustls connector factory
|
||||
pub struct RustlsConnector<T, U> {
|
||||
connector: Arc<ClientConfig>,
|
||||
_t: PhantomData<(T, U)>,
|
||||
}
|
||||
|
||||
impl<T, U> RustlsConnector<T, U> {
|
||||
pub fn new(connector: Arc<ClientConfig>) -> Self {
|
||||
RustlsConnector {
|
||||
connector,
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> RustlsConnector<T, U>
|
||||
where
|
||||
T: Address,
|
||||
U: AsyncRead + AsyncWrite + fmt::Debug,
|
||||
{
|
||||
pub fn service(
|
||||
connector: Arc<ClientConfig>,
|
||||
) -> impl Service<
|
||||
Request = Connection<T, U>,
|
||||
Response = Connection<T, TlsStream<U, ClientSession>>,
|
||||
Error = std::io::Error,
|
||||
> {
|
||||
RustlsConnectorService {
|
||||
connector: connector,
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> Clone for RustlsConnector<T, U> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
connector: self.connector.clone(),
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Address, U> NewService for RustlsConnector<T, U>
|
||||
where
|
||||
U: AsyncRead + AsyncWrite + fmt::Debug,
|
||||
{
|
||||
type Request = Connection<T, U>;
|
||||
type Response = Connection<T, TlsStream<U, ClientSession>>;
|
||||
type Error = std::io::Error;
|
||||
type Config = ();
|
||||
type Service = RustlsConnectorService<T, U>;
|
||||
type InitError = ();
|
||||
type Future = FutureResult<Self::Service, Self::InitError>;
|
||||
|
||||
fn new_service(&self, _: &()) -> Self::Future {
|
||||
ok(RustlsConnectorService {
|
||||
connector: self.connector.clone(),
|
||||
_t: PhantomData,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RustlsConnectorService<T, U> {
|
||||
connector: Arc<ClientConfig>,
|
||||
_t: PhantomData<(T, U)>,
|
||||
}
|
||||
|
||||
impl<T: Address, U> Service for RustlsConnectorService<T, U>
|
||||
where
|
||||
U: AsyncRead + AsyncWrite + fmt::Debug,
|
||||
{
|
||||
type Request = Connection<T, U>;
|
||||
type Response = Connection<T, TlsStream<U, ClientSession>>;
|
||||
type Error = std::io::Error;
|
||||
type Future = ConnectAsyncExt<T, U>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, stream: Connection<T, U>) -> Self::Future {
|
||||
trace!("SSL Handshake start for: {:?}", stream.host());
|
||||
let (io, stream) = stream.replace(());
|
||||
let host = DNSNameRef::try_from_ascii_str(stream.host()).unwrap();
|
||||
ConnectAsyncExt {
|
||||
fut: TlsConnector::from(self.connector.clone()).connect(host, io),
|
||||
stream: Some(stream),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ConnectAsyncExt<T, U> {
|
||||
fut: Connect<U>,
|
||||
stream: Option<Connection<T, ()>>,
|
||||
}
|
||||
|
||||
impl<T: Address, U> Future for ConnectAsyncExt<T, U>
|
||||
where
|
||||
U: AsyncRead + AsyncWrite + fmt::Debug,
|
||||
{
|
||||
type Item = Connection<T, TlsStream<U, ClientSession>>;
|
||||
type Error = std::io::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
match self.fut.poll().map_err(|e| {
|
||||
trace!("SSL Handshake error: {:?}", e);
|
||||
e
|
||||
})? {
|
||||
Async::Ready(stream) => {
|
||||
let s = self.stream.take().unwrap();
|
||||
trace!("SSL Handshake success: {:?}", s.host());
|
||||
Ok(Async::Ready(s.replace(stream).1))
|
||||
}
|
||||
Async::NotReady => Ok(Async::NotReady),
|
||||
}
|
||||
}
|
||||
}
|
@@ -26,6 +26,22 @@ fn test_string() {
|
||||
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
||||
}
|
||||
|
||||
#[cfg(feature = "rust-tls")]
|
||||
#[test]
|
||||
fn test_rustls_string() {
|
||||
let mut srv = TestServer::with(|| {
|
||||
service_fn(|io: Io<tokio_tcp::TcpStream>| {
|
||||
Framed::new(io.into_parts().0, BytesCodec)
|
||||
.send(Bytes::from_static(b"test"))
|
||||
.then(|_| Ok::<_, ()>(()))
|
||||
})
|
||||
});
|
||||
|
||||
let mut conn = default_connector();
|
||||
let addr = format!("localhost:{}", srv.port());
|
||||
let con = srv.run_on(move || conn.call(addr.into())).unwrap();
|
||||
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
||||
}
|
||||
#[test]
|
||||
fn test_static_str() {
|
||||
let mut srv = TestServer::with(|| {
|
||||
@@ -107,3 +123,20 @@ fn test_uri() {
|
||||
let con = srv.run_on(move || conn.call(addr.into())).unwrap();
|
||||
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
||||
}
|
||||
|
||||
#[cfg(feature = "rust-tls")]
|
||||
#[test]
|
||||
fn test_rustls_uri() {
|
||||
let mut srv = TestServer::with(|| {
|
||||
service_fn(|io: Io<tokio_tcp::TcpStream>| {
|
||||
Framed::new(io.into_parts().0, BytesCodec)
|
||||
.send(Bytes::from_static(b"test"))
|
||||
.then(|_| Ok::<_, ()>(()))
|
||||
})
|
||||
});
|
||||
|
||||
let mut conn = default_connector();
|
||||
let addr = Uri::try_from(format!("https://localhost:{}", srv.port())).unwrap();
|
||||
let con = srv.run_on(move || conn.call(addr.into())).unwrap();
|
||||
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
||||
}
|
||||
|
@@ -44,7 +44,7 @@ where
|
||||
framed: Framed<T, U>,
|
||||
rx: Option<mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>>,
|
||||
inner: Cell<FramedDispatcherInner<<U as Encoder>::Item, S::Error>>,
|
||||
disconnect: Option<Rc<Fn(&mut St, bool)>>,
|
||||
disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
|
||||
}
|
||||
|
||||
impl<St, S, T, U> FramedDispatcher<St, S, T, U>
|
||||
@@ -63,7 +63,7 @@ where
|
||||
service: F,
|
||||
rx: mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>,
|
||||
sink: Sink<<U as Encoder>::Item>,
|
||||
disconnect: Option<Rc<Fn(&mut St, bool)>>,
|
||||
disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
|
||||
) -> Self {
|
||||
FramedDispatcher {
|
||||
framed,
|
||||
|
@@ -62,7 +62,7 @@ impl<St, Codec> Builder<St, Codec> {
|
||||
|
||||
pub struct ServiceBuilder<St, C, Io, Codec> {
|
||||
connect: C,
|
||||
disconnect: Option<Rc<Fn(&mut St, bool)>>,
|
||||
disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
|
||||
_t: PhantomData<(St, Io, Codec)>,
|
||||
}
|
||||
|
||||
@@ -113,7 +113,7 @@ where
|
||||
|
||||
pub struct NewServiceBuilder<St, C, Io, Codec> {
|
||||
connect: C,
|
||||
disconnect: Option<Rc<Fn(&mut St, bool)>>,
|
||||
disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
|
||||
_t: PhantomData<(St, Io, Codec)>,
|
||||
}
|
||||
|
||||
@@ -170,7 +170,7 @@ where
|
||||
pub(crate) struct FramedService<St, C, T, Io, Codec, Cfg> {
|
||||
connect: C,
|
||||
handler: Rc<T>,
|
||||
disconnect: Option<Rc<Fn(&mut St, bool)>>,
|
||||
disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
|
||||
_t: PhantomData<(St, Io, Codec, Cfg)>,
|
||||
}
|
||||
|
||||
@@ -198,7 +198,7 @@ where
|
||||
type Error = ServiceError<C::Error, Codec>;
|
||||
type InitError = C::InitError;
|
||||
type Service = FramedServiceImpl<St, C::Service, T, Io, Codec>;
|
||||
type Future = Box<Future<Item = Self::Service, Error = Self::InitError>>;
|
||||
type Future = Box<dyn Future<Item = Self::Service, Error = Self::InitError>>;
|
||||
|
||||
fn new_service(&self, _: &Cfg) -> Self::Future {
|
||||
let handler = self.handler.clone();
|
||||
@@ -221,7 +221,7 @@ where
|
||||
pub struct FramedServiceImpl<St, C, T, Io, Codec> {
|
||||
connect: C,
|
||||
handler: Rc<T>,
|
||||
disconnect: Option<Rc<Fn(&mut St, bool)>>,
|
||||
disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
|
||||
_t: PhantomData<(St, Io, Codec)>,
|
||||
}
|
||||
|
||||
@@ -280,7 +280,7 @@ where
|
||||
<Codec as Encoder>::Error: std::fmt::Debug,
|
||||
{
|
||||
inner: FramedServiceImplResponseInner<St, Io, Codec, C, T>,
|
||||
disconnect: Option<Rc<Fn(&mut St, bool)>>,
|
||||
disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
|
||||
}
|
||||
|
||||
enum FramedServiceImplResponseInner<St, Io, Codec, C, T>
|
||||
|
@@ -1,5 +1,12 @@
|
||||
# Changes
|
||||
|
||||
## [0.2.5] - 2019-09-02
|
||||
|
||||
### Added
|
||||
|
||||
* Add arbiter specific storage
|
||||
|
||||
|
||||
## [0.2.4] - 2019-07-17
|
||||
|
||||
### Changed
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-rt"
|
||||
version = "0.2.4"
|
||||
version = "0.2.5"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix runtime"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
|
@@ -1,3 +1,4 @@
|
||||
use std::any::{Any, TypeId};
|
||||
use std::cell::{Cell, RefCell};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
@@ -16,15 +17,16 @@ use copyless::BoxHelper;
|
||||
thread_local!(
|
||||
static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None);
|
||||
static RUNNING: Cell<bool> = Cell::new(false);
|
||||
static Q: RefCell<Vec<Box<Future<Item = (), Error = ()>>>> = RefCell::new(Vec::new());
|
||||
static Q: RefCell<Vec<Box<dyn Future<Item = (), Error = ()>>>> = RefCell::new(Vec::new());
|
||||
static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
|
||||
);
|
||||
|
||||
pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
pub(crate) enum ArbiterCommand {
|
||||
Stop,
|
||||
Execute(Box<Future<Item = (), Error = ()> + Send>),
|
||||
ExecuteFn(Box<FnExec>),
|
||||
Execute(Box<dyn Future<Item = (), Error = ()> + Send>),
|
||||
ExecuteFn(Box<dyn FnExec>),
|
||||
}
|
||||
|
||||
impl fmt::Debug for ArbiterCommand {
|
||||
@@ -56,6 +58,7 @@ impl Arbiter {
|
||||
let arb = Arbiter(tx);
|
||||
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
|
||||
RUNNING.with(|cell| cell.set(false));
|
||||
STORAGE.with(|cell| cell.borrow_mut().clear());
|
||||
Arbiter::spawn(ArbiterController { stop: None, rx });
|
||||
|
||||
arb
|
||||
@@ -90,6 +93,7 @@ impl Arbiter {
|
||||
|
||||
let (stop, stop_rx) = channel();
|
||||
RUNNING.with(|cell| cell.set(true));
|
||||
STORAGE.with(|cell| cell.borrow_mut().clear());
|
||||
|
||||
System::set_current(sys);
|
||||
|
||||
@@ -180,7 +184,7 @@ impl Arbiter {
|
||||
let _ = self
|
||||
.0
|
||||
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
|
||||
let _ = f();
|
||||
f();
|
||||
})));
|
||||
}
|
||||
|
||||
@@ -202,6 +206,50 @@ impl Arbiter {
|
||||
})));
|
||||
rx
|
||||
}
|
||||
|
||||
/// Set item to arbiter storage
|
||||
pub fn set_item<T: 'static>(item: T) {
|
||||
STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::<T>(), Box::new(item)));
|
||||
}
|
||||
|
||||
/// Check if arbiter storage contains item
|
||||
pub fn contains_item<T: 'static>() -> bool {
|
||||
STORAGE.with(move |cell| cell.borrow().get(&TypeId::of::<T>()).is_some())
|
||||
}
|
||||
|
||||
/// Get a reference to a type previously inserted on this arbiter's storage.
|
||||
///
|
||||
/// Panics is item is not inserted
|
||||
pub fn get_item<T: 'static, F, R>(mut f: F) -> R
|
||||
where
|
||||
F: FnMut(&T) -> R,
|
||||
{
|
||||
STORAGE.with(move |cell| {
|
||||
let st = cell.borrow();
|
||||
let item = st
|
||||
.get(&TypeId::of::<T>())
|
||||
.and_then(|boxed| (&**boxed as &(dyn Any + 'static)).downcast_ref())
|
||||
.unwrap();
|
||||
f(item)
|
||||
})
|
||||
}
|
||||
|
||||
/// Get a mutable reference to a type previously inserted on this arbiter's storage.
|
||||
///
|
||||
/// Panics is item is not inserted
|
||||
pub fn get_mut_item<T: 'static, F, R>(mut f: F) -> R
|
||||
where
|
||||
F: FnMut(&mut T) -> R,
|
||||
{
|
||||
STORAGE.with(move |cell| {
|
||||
let mut st = cell.borrow_mut();
|
||||
let item = st
|
||||
.get_mut(&TypeId::of::<T>())
|
||||
.and_then(|boxed| (&mut **boxed as &mut (dyn Any + 'static)).downcast_mut())
|
||||
.unwrap();
|
||||
f(item)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
struct ArbiterController {
|
||||
@@ -312,7 +360,7 @@ impl<F> FnExec for F
|
||||
where
|
||||
F: FnOnce() + Send + 'static,
|
||||
{
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(boxed_local))]
|
||||
#[allow(clippy::boxed_local)]
|
||||
fn call_box(self: Box<Self>) {
|
||||
(*self)()
|
||||
}
|
||||
|
@@ -40,7 +40,7 @@ impl Error for RunError {
|
||||
fn description(&self) -> &str {
|
||||
self.inner.description()
|
||||
}
|
||||
fn cause(&self) -> Option<&Error> {
|
||||
fn cause(&self) -> Option<&dyn Error> {
|
||||
self.inner.source()
|
||||
}
|
||||
}
|
||||
|
@@ -1,5 +1,12 @@
|
||||
# Changes
|
||||
|
||||
## [0.4.2] - 2019-08-27
|
||||
|
||||
### Fixed
|
||||
|
||||
* Check service readiness for `new_apply_cfg` combinator
|
||||
|
||||
|
||||
## [0.4.1] - 2019-06-06
|
||||
|
||||
### Added
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-service"
|
||||
version = "0.4.1"
|
||||
version = "0.4.2"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix Service"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
|
@@ -49,6 +49,7 @@ where
|
||||
C: Clone,
|
||||
F: FnMut(&C, &mut T::Service) -> R,
|
||||
T: NewService<Config = ()>,
|
||||
T::InitError: From<T::Error>,
|
||||
R: IntoFuture<Error = T::InitError>,
|
||||
R::Item: IntoService<S>,
|
||||
S: Service,
|
||||
@@ -179,6 +180,7 @@ where
|
||||
C: Clone,
|
||||
F: FnMut(&C, &mut T::Service) -> R,
|
||||
T: NewService<Config = ()>,
|
||||
T::InitError: From<T::Error>,
|
||||
R: IntoFuture<Error = T::InitError>,
|
||||
R::Item: IntoService<S>,
|
||||
S: Service,
|
||||
@@ -196,8 +198,9 @@ where
|
||||
ApplyConfigNewServiceFut {
|
||||
f: self.f.clone(),
|
||||
cfg: cfg.clone(),
|
||||
srv: Some(self.srv.get_ref().new_service(&())),
|
||||
fut: None,
|
||||
srv: None,
|
||||
srv_fut: Some(self.srv.get_ref().new_service(&())),
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
@@ -208,13 +211,15 @@ where
|
||||
C: Clone,
|
||||
F: FnMut(&C, &mut T::Service) -> R,
|
||||
T: NewService<Config = ()>,
|
||||
T::InitError: From<T::Error>,
|
||||
R: IntoFuture<Error = T::InitError>,
|
||||
R::Item: IntoService<S>,
|
||||
S: Service,
|
||||
{
|
||||
cfg: C,
|
||||
f: Cell<F>,
|
||||
srv: Option<T::Future>,
|
||||
srv: Option<T::Service>,
|
||||
srv_fut: Option<T::Future>,
|
||||
fut: Option<R::Future>,
|
||||
_t: PhantomData<(S,)>,
|
||||
}
|
||||
@@ -224,6 +229,7 @@ where
|
||||
C: Clone,
|
||||
F: FnMut(&C, &mut T::Service) -> R,
|
||||
T: NewService<Config = ()>,
|
||||
T::InitError: From<T::Error>,
|
||||
R: IntoFuture<Error = T::InitError>,
|
||||
R::Item: IntoService<S>,
|
||||
S: Service,
|
||||
@@ -232,12 +238,12 @@ where
|
||||
type Error = R::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
if let Some(ref mut fut) = self.srv {
|
||||
if let Some(ref mut fut) = self.srv_fut {
|
||||
match fut.poll()? {
|
||||
Async::NotReady => return Ok(Async::NotReady),
|
||||
Async::Ready(mut srv) => {
|
||||
let _ = self.srv.take();
|
||||
self.fut = Some(self.f.get_mut()(&self.cfg, &mut srv).into_future());
|
||||
Async::Ready(srv) => {
|
||||
let _ = self.srv_fut.take();
|
||||
self.srv = Some(srv);
|
||||
return self.poll();
|
||||
}
|
||||
}
|
||||
@@ -245,6 +251,14 @@ where
|
||||
|
||||
if let Some(ref mut fut) = self.fut {
|
||||
Ok(Async::Ready(try_ready!(fut.poll()).into_service()))
|
||||
} else if let Some(ref mut srv) = self.srv {
|
||||
match srv.poll_ready()? {
|
||||
Async::NotReady => Ok(Async::NotReady),
|
||||
Async::Ready(_) => {
|
||||
self.fut = Some(self.f.get_mut()(&self.cfg, srv).into_future());
|
||||
return self.poll();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
|
@@ -4,7 +4,7 @@ use futures::{Async, Future, IntoFuture, Poll};
|
||||
use crate::{NewService, Service};
|
||||
|
||||
pub type BoxedService<Req, Res, Err> = Box<
|
||||
Service<
|
||||
dyn Service<
|
||||
Request = Req,
|
||||
Response = Res,
|
||||
Error = Err,
|
||||
@@ -13,7 +13,7 @@ pub type BoxedService<Req, Res, Err> = Box<
|
||||
>;
|
||||
|
||||
pub type BoxedServiceResponse<Res, Err> =
|
||||
Either<FutureResult<Res, Err>, Box<Future<Item = Res, Error = Err>>>;
|
||||
Either<FutureResult<Res, Err>, Box<dyn Future<Item = Res, Error = Err>>>;
|
||||
|
||||
pub struct BoxedNewService<C, Req, Res, Err, InitErr>(Inner<C, Req, Res, Err, InitErr>);
|
||||
|
||||
@@ -46,14 +46,14 @@ where
|
||||
}
|
||||
|
||||
type Inner<C, Req, Res, Err, InitErr> = Box<
|
||||
NewService<
|
||||
dyn NewService<
|
||||
Config = C,
|
||||
Request = Req,
|
||||
Response = Res,
|
||||
Error = Err,
|
||||
InitError = InitErr,
|
||||
Service = BoxedService<Req, Res, Err>,
|
||||
Future = Box<Future<Item = BoxedService<Req, Res, Err>, Error = InitErr>>,
|
||||
Future = Box<dyn Future<Item = BoxedService<Req, Res, Err>, Error = InitErr>>,
|
||||
>,
|
||||
>;
|
||||
|
||||
@@ -70,7 +70,7 @@ where
|
||||
type InitError = InitErr;
|
||||
type Config = C;
|
||||
type Service = BoxedService<Req, Res, Err>;
|
||||
type Future = Box<Future<Item = Self::Service, Error = Self::InitError>>;
|
||||
type Future = Box<dyn Future<Item = Self::Service, Error = Self::InitError>>;
|
||||
|
||||
fn new_service(&self, cfg: &C) -> Self::Future {
|
||||
self.0.new_service(cfg)
|
||||
@@ -99,7 +99,7 @@ where
|
||||
type InitError = InitErr;
|
||||
type Config = C;
|
||||
type Service = BoxedService<Req, Res, Err>;
|
||||
type Future = Box<Future<Item = Self::Service, Error = Self::InitError>>;
|
||||
type Future = Box<dyn Future<Item = Self::Service, Error = Self::InitError>>;
|
||||
|
||||
fn new_service(&self, cfg: &C) -> Self::Future {
|
||||
Box::new(
|
||||
@@ -133,7 +133,7 @@ where
|
||||
type Error = Err;
|
||||
type Future = Either<
|
||||
FutureResult<Self::Response, Self::Error>,
|
||||
Box<Future<Item = Self::Response, Error = Self::Error>>,
|
||||
Box<dyn Future<Item = Self::Response, Error = Self::Error>>,
|
||||
>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
|
@@ -34,6 +34,7 @@ impl<T> Cell<T> {
|
||||
unsafe { &mut *self.inner.as_ref().get() }
|
||||
}
|
||||
|
||||
#[allow(clippy::mut_from_ref)]
|
||||
pub(crate) unsafe fn get_mut_unsafe(&self) -> &mut T {
|
||||
&mut *self.inner.as_ref().get()
|
||||
}
|
||||
|
@@ -176,7 +176,7 @@ impl<T: ?Sized> ServiceExt for T where T: Service {}
|
||||
/// Creates new `Service` values.
|
||||
///
|
||||
/// Acts as a service factory. This is useful for cases where new `Service`
|
||||
/// values must be produced. One case is a TCP servier listener. The listner
|
||||
/// values must be produced. One case is a TCP server listener. The listener
|
||||
/// accepts new TCP streams, obtains a new `Service` value using the
|
||||
/// `NewService` trait, and uses that new `Service` value to process inbound
|
||||
/// requests on that new TCP stream.
|
||||
|
@@ -1,9 +1,13 @@
|
||||
# Changes
|
||||
|
||||
## [0.1.2] - 2019-08-05
|
||||
|
||||
### Changed
|
||||
|
||||
* Update `derive_more` to 0.15
|
||||
|
||||
* Update `parking_lot` to 0.9
|
||||
|
||||
## [0.1.1] - 2019-06-05
|
||||
|
||||
* Update parking_lot
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-threadpool"
|
||||
version = "0.1.1"
|
||||
version = "0.1.2"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix thread pool for sync code"
|
||||
keywords = ["actix", "network", "framework", "async", "futures"]
|
||||
@@ -20,7 +20,7 @@ path = "src/lib.rs"
|
||||
[dependencies]
|
||||
derive_more = "0.15"
|
||||
futures = "0.1.25"
|
||||
parking_lot = "0.8"
|
||||
parking_lot = "0.9"
|
||||
lazy_static = "1.2"
|
||||
log = "0.4"
|
||||
num_cpus = "1.10"
|
||||
|
@@ -1,5 +1,12 @@
|
||||
# Changes
|
||||
|
||||
## [0.4.5] - 2019-07-19
|
||||
|
||||
### Removed
|
||||
|
||||
* Deprecated `CloneableService` as it is not safe
|
||||
|
||||
|
||||
## [0.4.4] - 2019-07-17
|
||||
|
||||
### Changed
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-utils"
|
||||
version = "0.4.4"
|
||||
version = "0.4.5"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix utils - various actix net related services"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
|
@@ -1,55 +0,0 @@
|
||||
#![allow(deprecated)]
|
||||
use std::marker::PhantomData;
|
||||
use std::rc::Rc;
|
||||
|
||||
use actix_service::Service;
|
||||
use futures::Poll;
|
||||
|
||||
use super::cell::Cell;
|
||||
|
||||
#[doc(hidden)]
|
||||
#[deprecated(since = "0.4.3", note = "support will be removed in actix-utils 0.4.5")]
|
||||
/// Service that allows to turn non-clone service to a service with `Clone` impl
|
||||
pub struct CloneableService<T> {
|
||||
service: Cell<T>,
|
||||
_t: PhantomData<Rc<()>>,
|
||||
}
|
||||
|
||||
impl<T> CloneableService<T> {
|
||||
pub fn new(service: T) -> Self
|
||||
where
|
||||
T: Service,
|
||||
{
|
||||
Self {
|
||||
service: Cell::new(service),
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Clone for CloneableService<T> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
service: self.service.clone(),
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Service for CloneableService<T>
|
||||
where
|
||||
T: Service,
|
||||
{
|
||||
type Request = T::Request;
|
||||
type Response = T::Response;
|
||||
type Error = T::Error;
|
||||
type Future = T::Future;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
self.service.get_mut().poll_ready()
|
||||
}
|
||||
|
||||
fn call(&mut self, req: T::Request) -> Self::Future {
|
||||
self.service.get_mut().call(req)
|
||||
}
|
||||
}
|
@@ -119,7 +119,7 @@ mod tests {
|
||||
type Request = ();
|
||||
type Response = ();
|
||||
type Error = ();
|
||||
type Future = Box<Future<Item = (), Error = ()>>;
|
||||
type Future = Box<dyn Future<Item = (), Error = ()>>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Ok(Async::Ready(()))
|
||||
|
@@ -1,6 +1,6 @@
|
||||
//! Actix utils - various helper services
|
||||
|
||||
mod cell;
|
||||
pub mod cloneable;
|
||||
pub mod counter;
|
||||
pub mod either;
|
||||
pub mod framed;
|
||||
|
@@ -214,7 +214,7 @@ mod tests {
|
||||
type Request = oneshot::Receiver<usize>;
|
||||
type Response = usize;
|
||||
type Error = ();
|
||||
type Future = Box<Future<Item = usize, Error = ()>>;
|
||||
type Future = Box<dyn Future<Item = usize, Error = ()>>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Ok(Async::Ready(()))
|
||||
|
@@ -45,7 +45,7 @@ where
|
||||
type Request = S;
|
||||
type Response = ();
|
||||
type Error = E;
|
||||
type Future = Box<Future<Item = (), Error = E>>;
|
||||
type Future = Box<dyn Future<Item = (), Error = E>>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Ok(Async::Ready(()))
|
||||
|
@@ -190,7 +190,7 @@ mod tests {
|
||||
type Request = ();
|
||||
type Response = ();
|
||||
type Error = ();
|
||||
type Future = Box<Future<Item = (), Error = ()>>;
|
||||
type Future = Box<dyn Future<Item = (), Error = ()>>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Ok(Async::Ready(()))
|
||||
|
@@ -151,7 +151,7 @@ impl<'de, T: ResourcePath + 'de> Deserializer<'de> for PathDeserializer<'de, T>
|
||||
where
|
||||
V: Visitor<'de>,
|
||||
{
|
||||
if self.path.len() < 1 {
|
||||
if self.path.is_empty() {
|
||||
Err(de::value::Error::custom(
|
||||
"expeceted at least one parameters",
|
||||
))
|
||||
|
@@ -93,7 +93,7 @@ impl<T: ResourcePath> Path<T> {
|
||||
#[inline]
|
||||
/// Skip first `n` chars in path
|
||||
pub fn skip(&mut self, n: u16) {
|
||||
self.skip = self.skip + n;
|
||||
self.skip += n;
|
||||
}
|
||||
|
||||
pub(crate) fn add(&mut self, name: Rc<String>, value: PathItem) {
|
||||
|
@@ -207,7 +207,7 @@ impl ResourceDef {
|
||||
"Dynamic path match but not all segments found: {}",
|
||||
name
|
||||
);
|
||||
false;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -279,7 +279,7 @@ impl ResourceDef {
|
||||
"Dynamic path match but not all segments found: {}",
|
||||
name
|
||||
);
|
||||
false;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
Reference in New Issue
Block a user