1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-08-12 23:27:05 +02:00

Compare commits

..

6 Commits

Author SHA1 Message Date
Nikolay Kim
62e429cb0c Merge branch 'master' of github.com:actix/actix-net 2019-08-05 09:53:03 -07:00
Nikolay Kim
a2643d475a Add ConnectService and OpensslConnectService 2019-08-05 09:52:50 -07:00
Sven-Hendrik Haase
34c259a8b5 Merge pull request #35 from ignatenkobrain/parking_lot
threadpool: Update parking_lot to 0.9
2019-08-05 17:30:19 +02:00
Igor Gnatenko
8b398c3386 threadpool: Update parking_lot to 0.9
Signed-off-by: Igor Gnatenko <i.gnatenko.brain@gmail.com>
2019-08-04 15:46:14 +02:00
Neil Locketz
0baceb0e56 Fix typo in desc (#34) 2019-07-30 09:35:57 -07:00
Michael Snoyman
6be1f37f6c Minor typo corrections in docs (#33) 2019-07-25 11:46:11 +06:00
12 changed files with 283 additions and 13 deletions

View File

@@ -2,7 +2,7 @@
name = "actix-net"
version = "0.3.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix net - framework for the compisible network services for Rust"
description = "Actix net - framework for the composable network services for Rust"
readme = "README.md"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs"

View File

@@ -1,5 +1,9 @@
# Changes
## [0.2.3] - 2019-08-05
* Add `ConnectService` and `OpensslConnectService`
## [0.2.2] - 2019-07-24
* Add `rustls` support

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-connect"
version = "0.2.2"
version = "0.2.3"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix Connector - tcp connector service"
keywords = ["network", "framework", "async", "futures"]

View File

@@ -18,6 +18,17 @@ impl<T> TcpConnectorFactory<T> {
pub fn new() -> Self {
TcpConnectorFactory(PhantomData)
}
/// Create tcp connector service
pub fn service(&self) -> TcpConnector<T> {
TcpConnector(PhantomData)
}
}
impl<T> Default for TcpConnectorFactory<T> {
fn default() -> Self {
TcpConnectorFactory(PhantomData)
}
}
impl<T> Clone for TcpConnectorFactory<T> {
@@ -36,7 +47,7 @@ impl<T: Address> NewService for TcpConnectorFactory<T> {
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future {
ok(TcpConnector(PhantomData))
ok(self.service())
}
}

View File

@@ -16,6 +16,7 @@ mod connect;
mod connector;
mod error;
mod resolver;
mod service;
pub mod ssl;
#[cfg(feature = "uri")]
@@ -29,6 +30,7 @@ pub use self::connect::{Address, Connect, Connection};
pub use self::connector::{TcpConnector, TcpConnectorFactory};
pub use self::error::ConnectError;
pub use self::resolver::{Resolver, ResolverFactory};
pub use self::service::{ConnectService, ConnectServiceFactory};
use actix_service::{NewService, Service, ServiceExt};
use tokio_tcp::TcpStream;

View File

@@ -25,6 +25,13 @@ impl<T> ResolverFactory<T> {
_t: PhantomData,
}
}
pub fn service(&self) -> Resolver<T> {
Resolver {
resolver: self.resolver.clone(),
_t: PhantomData,
}
}
}
impl<T> Default for ResolverFactory<T> {
@@ -55,10 +62,7 @@ impl<T: Address> NewService for ResolverFactory<T> {
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future {
ok(Resolver {
resolver: self.resolver.clone(),
_t: PhantomData,
})
ok(self.service())
}
}

View File

@@ -0,0 +1,123 @@
use actix_service::{NewService, Service};
use futures::future::{ok, FutureResult};
use futures::{try_ready, Async, Future, Poll};
use tokio_tcp::TcpStream;
use trust_dns_resolver::AsyncResolver;
use crate::connect::{Address, Connect, Connection};
use crate::connector::{TcpConnector, TcpConnectorFactory};
use crate::error::ConnectError;
use crate::resolver::{Resolver, ResolverFactory};
pub struct ConnectServiceFactory<T> {
tcp: TcpConnectorFactory<T>,
resolver: ResolverFactory<T>,
}
impl<T> ConnectServiceFactory<T> {
/// Construct new ConnectService factory
pub fn new() -> Self {
ConnectServiceFactory {
tcp: TcpConnectorFactory::default(),
resolver: ResolverFactory::default(),
}
}
/// Construct new connect service with custom dns resolver
pub fn with_resolver(resolver: AsyncResolver) -> Self {
ConnectServiceFactory {
tcp: TcpConnectorFactory::default(),
resolver: ResolverFactory::new(resolver),
}
}
/// Construct new service
pub fn service(&self) -> ConnectService<T> {
ConnectService {
tcp: self.tcp.service(),
resolver: self.resolver.service(),
}
}
}
impl<T> Default for ConnectServiceFactory<T> {
fn default() -> Self {
ConnectServiceFactory {
tcp: TcpConnectorFactory::default(),
resolver: ResolverFactory::default(),
}
}
}
impl<T> Clone for ConnectServiceFactory<T> {
fn clone(&self) -> Self {
ConnectServiceFactory {
tcp: self.tcp.clone(),
resolver: self.resolver.clone(),
}
}
}
impl<T: Address> NewService for ConnectServiceFactory<T> {
type Request = Connect<T>;
type Response = Connection<T, TcpStream>;
type Error = ConnectError;
type Config = ();
type Service = ConnectService<T>;
type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future {
ok(self.service())
}
}
#[derive(Clone)]
pub struct ConnectService<T> {
tcp: TcpConnector<T>,
resolver: Resolver<T>,
}
impl<T: Address> Service for ConnectService<T> {
type Request = Connect<T>;
type Response = Connection<T, TcpStream>;
type Error = ConnectError;
type Future = ConnectServiceResponse<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: Connect<T>) -> Self::Future {
ConnectServiceResponse {
fut1: Some(self.resolver.call(req)),
fut2: None,
tcp: self.tcp.clone(),
}
}
}
pub struct ConnectServiceResponse<T: Address> {
fut1: Option<<Resolver<T> as Service>::Future>,
fut2: Option<<TcpConnector<T> as Service>::Future>,
tcp: TcpConnector<T>,
}
impl<T: Address> Future for ConnectServiceResponse<T> {
type Item = Connection<T, TcpStream>;
type Error = ConnectError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.fut1 {
let res = try_ready!(fut.poll());
let _ = self.fut1.take();
self.fut2 = Some(self.tcp.call(res));
}
if let Some(ref mut fut) = self.fut2 {
return fut.poll();
}
Ok(Async::NotReady)
}
}

View File

@@ -3,7 +3,9 @@
#[cfg(feature = "ssl")]
mod openssl;
#[cfg(feature = "ssl")]
pub use self::openssl::OpensslConnector;
pub use self::openssl::{
OpensslConnectService, OpensslConnectServiceFactory, OpensslConnector,
};
#[cfg(feature = "rust-tls")]
mod rustls;
#[cfg(feature = "rust-tls")]

View File

@@ -1,13 +1,17 @@
use std::fmt;
use std::marker::PhantomData;
use std::{fmt, io};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::{NewService, Service};
use futures::{future::ok, future::FutureResult, Async, Future, Poll};
use futures::{future::ok, future::FutureResult, try_ready, Async, Future, Poll};
use openssl::ssl::{HandshakeError, SslConnector};
use tokio_openssl::{ConnectAsync, SslConnectorExt, SslStream};
use tokio_tcp::TcpStream;
use trust_dns_resolver::AsyncResolver;
use crate::{Address, Connection};
use crate::{
Address, Connect, ConnectError, ConnectService, ConnectServiceFactory, Connection,
};
/// Openssl connector factory
pub struct OpensslConnector<T, U> {
@@ -77,6 +81,15 @@ pub struct OpensslConnectorService<T, U> {
_t: PhantomData<(T, U)>,
}
impl<T, U> Clone for OpensslConnectorService<T, U> {
fn clone(&self) -> Self {
Self {
connector: self.connector.clone(),
_t: PhantomData,
}
}
}
impl<T: Address, U> Service for OpensslConnectorService<T, U>
where
U: AsyncRead + AsyncWrite + fmt::Debug,
@@ -126,3 +139,113 @@ where
}
}
}
pub struct OpensslConnectServiceFactory<T> {
tcp: ConnectServiceFactory<T>,
openssl: OpensslConnector<T, TcpStream>,
}
impl<T> OpensslConnectServiceFactory<T> {
/// Construct new OpensslConnectService factory
pub fn new(connector: SslConnector) -> Self {
OpensslConnectServiceFactory {
tcp: ConnectServiceFactory::default(),
openssl: OpensslConnector::new(connector),
}
}
/// Construct new connect service with custom dns resolver
pub fn with_resolver(connector: SslConnector, resolver: AsyncResolver) -> Self {
OpensslConnectServiceFactory {
tcp: ConnectServiceFactory::with_resolver(resolver),
openssl: OpensslConnector::new(connector),
}
}
/// Construct openssl connect service
pub fn service(&self) -> OpensslConnectService<T> {
OpensslConnectService {
tcp: self.tcp.service(),
openssl: OpensslConnectorService {
connector: self.openssl.connector.clone(),
_t: PhantomData,
},
}
}
}
impl<T> Clone for OpensslConnectServiceFactory<T> {
fn clone(&self) -> Self {
OpensslConnectServiceFactory {
tcp: self.tcp.clone(),
openssl: self.openssl.clone(),
}
}
}
impl<T: Address> NewService for OpensslConnectServiceFactory<T> {
type Request = Connect<T>;
type Response = SslStream<TcpStream>;
type Error = ConnectError;
type Config = ();
type Service = OpensslConnectService<T>;
type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future {
ok(self.service())
}
}
#[derive(Clone)]
pub struct OpensslConnectService<T> {
tcp: ConnectService<T>,
openssl: OpensslConnectorService<T, TcpStream>,
}
impl<T: Address> Service for OpensslConnectService<T> {
type Request = Connect<T>;
type Response = SslStream<TcpStream>;
type Error = ConnectError;
type Future = OpensslConnectServiceResponse<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: Connect<T>) -> Self::Future {
OpensslConnectServiceResponse {
fut1: Some(self.tcp.call(req)),
fut2: None,
openssl: self.openssl.clone(),
}
}
}
pub struct OpensslConnectServiceResponse<T: Address> {
fut1: Option<<ConnectService<T> as Service>::Future>,
fut2: Option<<OpensslConnectorService<T, TcpStream> as Service>::Future>,
openssl: OpensslConnectorService<T, TcpStream>,
}
impl<T: Address> Future for OpensslConnectServiceResponse<T> {
type Item = SslStream<TcpStream>;
type Error = ConnectError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.fut1 {
let res = try_ready!(fut.poll());
let _ = self.fut1.take();
self.fut2 = Some(self.openssl.call(res));
}
if let Some(ref mut fut) = self.fut2 {
let connect = try_ready!(fut
.poll()
.map_err(|e| ConnectError::Io(io::Error::new(io::ErrorKind::Other, e))));
Ok(Async::Ready(connect.into_parts().0))
} else {
Ok(Async::NotReady)
}
}
}

View File

@@ -176,7 +176,7 @@ impl<T: ?Sized> ServiceExt for T where T: Service {}
/// Creates new `Service` values.
///
/// Acts as a service factory. This is useful for cases where new `Service`
/// values must be produced. One case is a TCP servier listener. The listner
/// values must be produced. One case is a TCP server listener. The listener
/// accepts new TCP streams, obtains a new `Service` value using the
/// `NewService` trait, and uses that new `Service` value to process inbound
/// requests on that new TCP stream.

View File

@@ -3,6 +3,7 @@
### Changed
* Update `derive_more` to 0.15
* Update `parking_lot` to 0.9
## [0.1.1] - 2019-06-05

View File

@@ -20,7 +20,7 @@ path = "src/lib.rs"
[dependencies]
derive_more = "0.15"
futures = "0.1.25"
parking_lot = "0.8"
parking_lot = "0.9"
lazy_static = "1.2"
log = "0.4"
num_cpus = "1.10"