mirror of
https://github.com/fafhrd91/actix-net
synced 2024-11-27 19:12:56 +01:00
Add ConnectService and OpensslConnectService
This commit is contained in:
parent
a742768feb
commit
a2643d475a
@ -1,5 +1,9 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [0.2.3] - 2019-08-05
|
||||||
|
|
||||||
|
* Add `ConnectService` and `OpensslConnectService`
|
||||||
|
|
||||||
## [0.2.2] - 2019-07-24
|
## [0.2.2] - 2019-07-24
|
||||||
|
|
||||||
* Add `rustls` support
|
* Add `rustls` support
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-connect"
|
name = "actix-connect"
|
||||||
version = "0.2.2"
|
version = "0.2.3"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Actix Connector - tcp connector service"
|
description = "Actix Connector - tcp connector service"
|
||||||
keywords = ["network", "framework", "async", "futures"]
|
keywords = ["network", "framework", "async", "futures"]
|
||||||
|
@ -18,6 +18,17 @@ impl<T> TcpConnectorFactory<T> {
|
|||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
TcpConnectorFactory(PhantomData)
|
TcpConnectorFactory(PhantomData)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create tcp connector service
|
||||||
|
pub fn service(&self) -> TcpConnector<T> {
|
||||||
|
TcpConnector(PhantomData)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Default for TcpConnectorFactory<T> {
|
||||||
|
fn default() -> Self {
|
||||||
|
TcpConnectorFactory(PhantomData)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Clone for TcpConnectorFactory<T> {
|
impl<T> Clone for TcpConnectorFactory<T> {
|
||||||
@ -36,7 +47,7 @@ impl<T: Address> NewService for TcpConnectorFactory<T> {
|
|||||||
type Future = FutureResult<Self::Service, Self::InitError>;
|
type Future = FutureResult<Self::Service, Self::InitError>;
|
||||||
|
|
||||||
fn new_service(&self, _: &()) -> Self::Future {
|
fn new_service(&self, _: &()) -> Self::Future {
|
||||||
ok(TcpConnector(PhantomData))
|
ok(self.service())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,6 +16,7 @@ mod connect;
|
|||||||
mod connector;
|
mod connector;
|
||||||
mod error;
|
mod error;
|
||||||
mod resolver;
|
mod resolver;
|
||||||
|
mod service;
|
||||||
pub mod ssl;
|
pub mod ssl;
|
||||||
|
|
||||||
#[cfg(feature = "uri")]
|
#[cfg(feature = "uri")]
|
||||||
@ -29,6 +30,7 @@ pub use self::connect::{Address, Connect, Connection};
|
|||||||
pub use self::connector::{TcpConnector, TcpConnectorFactory};
|
pub use self::connector::{TcpConnector, TcpConnectorFactory};
|
||||||
pub use self::error::ConnectError;
|
pub use self::error::ConnectError;
|
||||||
pub use self::resolver::{Resolver, ResolverFactory};
|
pub use self::resolver::{Resolver, ResolverFactory};
|
||||||
|
pub use self::service::{ConnectService, ConnectServiceFactory};
|
||||||
|
|
||||||
use actix_service::{NewService, Service, ServiceExt};
|
use actix_service::{NewService, Service, ServiceExt};
|
||||||
use tokio_tcp::TcpStream;
|
use tokio_tcp::TcpStream;
|
||||||
|
@ -25,6 +25,13 @@ impl<T> ResolverFactory<T> {
|
|||||||
_t: PhantomData,
|
_t: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn service(&self) -> Resolver<T> {
|
||||||
|
Resolver {
|
||||||
|
resolver: self.resolver.clone(),
|
||||||
|
_t: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Default for ResolverFactory<T> {
|
impl<T> Default for ResolverFactory<T> {
|
||||||
@ -55,10 +62,7 @@ impl<T: Address> NewService for ResolverFactory<T> {
|
|||||||
type Future = FutureResult<Self::Service, Self::InitError>;
|
type Future = FutureResult<Self::Service, Self::InitError>;
|
||||||
|
|
||||||
fn new_service(&self, _: &()) -> Self::Future {
|
fn new_service(&self, _: &()) -> Self::Future {
|
||||||
ok(Resolver {
|
ok(self.service())
|
||||||
resolver: self.resolver.clone(),
|
|
||||||
_t: PhantomData,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
123
actix-connect/src/service.rs
Normal file
123
actix-connect/src/service.rs
Normal file
@ -0,0 +1,123 @@
|
|||||||
|
use actix_service::{NewService, Service};
|
||||||
|
use futures::future::{ok, FutureResult};
|
||||||
|
use futures::{try_ready, Async, Future, Poll};
|
||||||
|
use tokio_tcp::TcpStream;
|
||||||
|
use trust_dns_resolver::AsyncResolver;
|
||||||
|
|
||||||
|
use crate::connect::{Address, Connect, Connection};
|
||||||
|
use crate::connector::{TcpConnector, TcpConnectorFactory};
|
||||||
|
use crate::error::ConnectError;
|
||||||
|
use crate::resolver::{Resolver, ResolverFactory};
|
||||||
|
|
||||||
|
pub struct ConnectServiceFactory<T> {
|
||||||
|
tcp: TcpConnectorFactory<T>,
|
||||||
|
resolver: ResolverFactory<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> ConnectServiceFactory<T> {
|
||||||
|
/// Construct new ConnectService factory
|
||||||
|
pub fn new() -> Self {
|
||||||
|
ConnectServiceFactory {
|
||||||
|
tcp: TcpConnectorFactory::default(),
|
||||||
|
resolver: ResolverFactory::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Construct new connect service with custom dns resolver
|
||||||
|
pub fn with_resolver(resolver: AsyncResolver) -> Self {
|
||||||
|
ConnectServiceFactory {
|
||||||
|
tcp: TcpConnectorFactory::default(),
|
||||||
|
resolver: ResolverFactory::new(resolver),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Construct new service
|
||||||
|
pub fn service(&self) -> ConnectService<T> {
|
||||||
|
ConnectService {
|
||||||
|
tcp: self.tcp.service(),
|
||||||
|
resolver: self.resolver.service(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Default for ConnectServiceFactory<T> {
|
||||||
|
fn default() -> Self {
|
||||||
|
ConnectServiceFactory {
|
||||||
|
tcp: TcpConnectorFactory::default(),
|
||||||
|
resolver: ResolverFactory::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Clone for ConnectServiceFactory<T> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
ConnectServiceFactory {
|
||||||
|
tcp: self.tcp.clone(),
|
||||||
|
resolver: self.resolver.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Address> NewService for ConnectServiceFactory<T> {
|
||||||
|
type Request = Connect<T>;
|
||||||
|
type Response = Connection<T, TcpStream>;
|
||||||
|
type Error = ConnectError;
|
||||||
|
type Config = ();
|
||||||
|
type Service = ConnectService<T>;
|
||||||
|
type InitError = ();
|
||||||
|
type Future = FutureResult<Self::Service, Self::InitError>;
|
||||||
|
|
||||||
|
fn new_service(&self, _: &()) -> Self::Future {
|
||||||
|
ok(self.service())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct ConnectService<T> {
|
||||||
|
tcp: TcpConnector<T>,
|
||||||
|
resolver: Resolver<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Address> Service for ConnectService<T> {
|
||||||
|
type Request = Connect<T>;
|
||||||
|
type Response = Connection<T, TcpStream>;
|
||||||
|
type Error = ConnectError;
|
||||||
|
type Future = ConnectServiceResponse<T>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||||
|
Ok(Async::Ready(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, req: Connect<T>) -> Self::Future {
|
||||||
|
ConnectServiceResponse {
|
||||||
|
fut1: Some(self.resolver.call(req)),
|
||||||
|
fut2: None,
|
||||||
|
tcp: self.tcp.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ConnectServiceResponse<T: Address> {
|
||||||
|
fut1: Option<<Resolver<T> as Service>::Future>,
|
||||||
|
fut2: Option<<TcpConnector<T> as Service>::Future>,
|
||||||
|
tcp: TcpConnector<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Address> Future for ConnectServiceResponse<T> {
|
||||||
|
type Item = Connection<T, TcpStream>;
|
||||||
|
type Error = ConnectError;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
if let Some(ref mut fut) = self.fut1 {
|
||||||
|
let res = try_ready!(fut.poll());
|
||||||
|
let _ = self.fut1.take();
|
||||||
|
self.fut2 = Some(self.tcp.call(res));
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(ref mut fut) = self.fut2 {
|
||||||
|
return fut.poll();
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Async::NotReady)
|
||||||
|
}
|
||||||
|
}
|
@ -3,7 +3,9 @@
|
|||||||
#[cfg(feature = "ssl")]
|
#[cfg(feature = "ssl")]
|
||||||
mod openssl;
|
mod openssl;
|
||||||
#[cfg(feature = "ssl")]
|
#[cfg(feature = "ssl")]
|
||||||
pub use self::openssl::OpensslConnector;
|
pub use self::openssl::{
|
||||||
|
OpensslConnectService, OpensslConnectServiceFactory, OpensslConnector,
|
||||||
|
};
|
||||||
#[cfg(feature = "rust-tls")]
|
#[cfg(feature = "rust-tls")]
|
||||||
mod rustls;
|
mod rustls;
|
||||||
#[cfg(feature = "rust-tls")]
|
#[cfg(feature = "rust-tls")]
|
||||||
|
@ -1,13 +1,17 @@
|
|||||||
use std::fmt;
|
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
|
use std::{fmt, io};
|
||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite};
|
use actix_codec::{AsyncRead, AsyncWrite};
|
||||||
use actix_service::{NewService, Service};
|
use actix_service::{NewService, Service};
|
||||||
use futures::{future::ok, future::FutureResult, Async, Future, Poll};
|
use futures::{future::ok, future::FutureResult, try_ready, Async, Future, Poll};
|
||||||
use openssl::ssl::{HandshakeError, SslConnector};
|
use openssl::ssl::{HandshakeError, SslConnector};
|
||||||
use tokio_openssl::{ConnectAsync, SslConnectorExt, SslStream};
|
use tokio_openssl::{ConnectAsync, SslConnectorExt, SslStream};
|
||||||
|
use tokio_tcp::TcpStream;
|
||||||
|
use trust_dns_resolver::AsyncResolver;
|
||||||
|
|
||||||
use crate::{Address, Connection};
|
use crate::{
|
||||||
|
Address, Connect, ConnectError, ConnectService, ConnectServiceFactory, Connection,
|
||||||
|
};
|
||||||
|
|
||||||
/// Openssl connector factory
|
/// Openssl connector factory
|
||||||
pub struct OpensslConnector<T, U> {
|
pub struct OpensslConnector<T, U> {
|
||||||
@ -77,6 +81,15 @@ pub struct OpensslConnectorService<T, U> {
|
|||||||
_t: PhantomData<(T, U)>,
|
_t: PhantomData<(T, U)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<T, U> Clone for OpensslConnectorService<T, U> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
connector: self.connector.clone(),
|
||||||
|
_t: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<T: Address, U> Service for OpensslConnectorService<T, U>
|
impl<T: Address, U> Service for OpensslConnectorService<T, U>
|
||||||
where
|
where
|
||||||
U: AsyncRead + AsyncWrite + fmt::Debug,
|
U: AsyncRead + AsyncWrite + fmt::Debug,
|
||||||
@ -126,3 +139,113 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct OpensslConnectServiceFactory<T> {
|
||||||
|
tcp: ConnectServiceFactory<T>,
|
||||||
|
openssl: OpensslConnector<T, TcpStream>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> OpensslConnectServiceFactory<T> {
|
||||||
|
/// Construct new OpensslConnectService factory
|
||||||
|
pub fn new(connector: SslConnector) -> Self {
|
||||||
|
OpensslConnectServiceFactory {
|
||||||
|
tcp: ConnectServiceFactory::default(),
|
||||||
|
openssl: OpensslConnector::new(connector),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Construct new connect service with custom dns resolver
|
||||||
|
pub fn with_resolver(connector: SslConnector, resolver: AsyncResolver) -> Self {
|
||||||
|
OpensslConnectServiceFactory {
|
||||||
|
tcp: ConnectServiceFactory::with_resolver(resolver),
|
||||||
|
openssl: OpensslConnector::new(connector),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Construct openssl connect service
|
||||||
|
pub fn service(&self) -> OpensslConnectService<T> {
|
||||||
|
OpensslConnectService {
|
||||||
|
tcp: self.tcp.service(),
|
||||||
|
openssl: OpensslConnectorService {
|
||||||
|
connector: self.openssl.connector.clone(),
|
||||||
|
_t: PhantomData,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Clone for OpensslConnectServiceFactory<T> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
OpensslConnectServiceFactory {
|
||||||
|
tcp: self.tcp.clone(),
|
||||||
|
openssl: self.openssl.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Address> NewService for OpensslConnectServiceFactory<T> {
|
||||||
|
type Request = Connect<T>;
|
||||||
|
type Response = SslStream<TcpStream>;
|
||||||
|
type Error = ConnectError;
|
||||||
|
type Config = ();
|
||||||
|
type Service = OpensslConnectService<T>;
|
||||||
|
type InitError = ();
|
||||||
|
type Future = FutureResult<Self::Service, Self::InitError>;
|
||||||
|
|
||||||
|
fn new_service(&self, _: &()) -> Self::Future {
|
||||||
|
ok(self.service())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct OpensslConnectService<T> {
|
||||||
|
tcp: ConnectService<T>,
|
||||||
|
openssl: OpensslConnectorService<T, TcpStream>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Address> Service for OpensslConnectService<T> {
|
||||||
|
type Request = Connect<T>;
|
||||||
|
type Response = SslStream<TcpStream>;
|
||||||
|
type Error = ConnectError;
|
||||||
|
type Future = OpensslConnectServiceResponse<T>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||||
|
Ok(Async::Ready(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, req: Connect<T>) -> Self::Future {
|
||||||
|
OpensslConnectServiceResponse {
|
||||||
|
fut1: Some(self.tcp.call(req)),
|
||||||
|
fut2: None,
|
||||||
|
openssl: self.openssl.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct OpensslConnectServiceResponse<T: Address> {
|
||||||
|
fut1: Option<<ConnectService<T> as Service>::Future>,
|
||||||
|
fut2: Option<<OpensslConnectorService<T, TcpStream> as Service>::Future>,
|
||||||
|
openssl: OpensslConnectorService<T, TcpStream>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Address> Future for OpensslConnectServiceResponse<T> {
|
||||||
|
type Item = SslStream<TcpStream>;
|
||||||
|
type Error = ConnectError;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
if let Some(ref mut fut) = self.fut1 {
|
||||||
|
let res = try_ready!(fut.poll());
|
||||||
|
let _ = self.fut1.take();
|
||||||
|
self.fut2 = Some(self.openssl.call(res));
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(ref mut fut) = self.fut2 {
|
||||||
|
let connect = try_ready!(fut
|
||||||
|
.poll()
|
||||||
|
.map_err(|e| ConnectError::Io(io::Error::new(io::ErrorKind::Other, e))));
|
||||||
|
Ok(Async::Ready(connect.into_parts().0))
|
||||||
|
} else {
|
||||||
|
Ok(Async::NotReady)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user