mirror of
https://github.com/fafhrd91/actix-net
synced 2024-12-02 19:52:24 +01:00
refactor Connect type and add tests
This commit is contained in:
parent
6ebff22601
commit
38545dedc7
@ -41,3 +41,11 @@ trust-dns-resolver = { version="0.11.0-alpha.2", default-features = false }
|
|||||||
# openssl
|
# openssl
|
||||||
openssl = { version="0.10", optional = true }
|
openssl = { version="0.10", optional = true }
|
||||||
tokio-openssl = { version="0.3", optional = true }
|
tokio-openssl = { version="0.3", optional = true }
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
bytes = "0.4"
|
||||||
|
actix-connect = { path=".", features=["ssl"] }
|
||||||
|
actix-test-server = { version="0.2.0", features=["ssl"] }
|
||||||
|
actix-server-config = "0.1.0"
|
||||||
|
actix-utils = "0.3.4"
|
||||||
|
tokio-tcp = "0.1"
|
@ -4,34 +4,32 @@ use std::net::SocketAddr;
|
|||||||
|
|
||||||
use either::Either;
|
use either::Either;
|
||||||
|
|
||||||
use crate::error::ConnectError;
|
|
||||||
|
|
||||||
/// Connect request
|
/// Connect request
|
||||||
pub trait Address {
|
pub trait Address {
|
||||||
/// Host name of the request
|
/// Host name of the request
|
||||||
fn host(&self) -> &str;
|
fn host(&self) -> &str;
|
||||||
|
|
||||||
/// Port of the request
|
/// Port of the request
|
||||||
fn port(&self) -> u16;
|
fn port(&self) -> Option<u16>;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Address for (String, u16) {
|
impl Address for String {
|
||||||
fn host(&self) -> &str {
|
fn host(&self) -> &str {
|
||||||
&self.0
|
&self
|
||||||
}
|
}
|
||||||
|
|
||||||
fn port(&self) -> u16 {
|
fn port(&self) -> Option<u16> {
|
||||||
self.1
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Address for (&'static str, u16) {
|
impl Address for &'static str {
|
||||||
fn host(&self) -> &str {
|
fn host(&self) -> &str {
|
||||||
self.0
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
fn port(&self) -> u16 {
|
fn port(&self) -> Option<u16> {
|
||||||
self.1
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -39,61 +37,26 @@ impl Address for (&'static str, u16) {
|
|||||||
#[derive(Eq, PartialEq, Debug, Hash)]
|
#[derive(Eq, PartialEq, Debug, Hash)]
|
||||||
pub struct Connect<T> {
|
pub struct Connect<T> {
|
||||||
pub(crate) req: T,
|
pub(crate) req: T,
|
||||||
|
pub(crate) port: u16,
|
||||||
pub(crate) addr: Option<Either<SocketAddr, VecDeque<SocketAddr>>>,
|
pub(crate) addr: Option<Either<SocketAddr, VecDeque<SocketAddr>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Connect<(&'static str, u16)> {
|
|
||||||
/// Create new `Connect` instance.
|
|
||||||
pub fn new(host: &'static str, port: u16) -> Connect<(&'static str, u16)> {
|
|
||||||
Connect {
|
|
||||||
req: (host, port),
|
|
||||||
addr: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Connect<()> {
|
|
||||||
/// Create new `Connect` instance.
|
|
||||||
pub fn from_string(host: String, port: u16) -> Connect<(String, u16)> {
|
|
||||||
Connect {
|
|
||||||
req: (host, port),
|
|
||||||
addr: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create new `Connect` instance.
|
|
||||||
pub fn from_static(host: &'static str, port: u16) -> Connect<(&'static str, u16)> {
|
|
||||||
Connect {
|
|
||||||
req: (host, port),
|
|
||||||
addr: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create `Connect` instance by spliting the string by ':' and convert the second part to u16
|
|
||||||
pub fn from_str<T: AsRef<str>>(host: T) -> Result<Connect<(String, u16)>, ConnectError> {
|
|
||||||
let mut parts_iter = host.as_ref().splitn(2, ':');
|
|
||||||
let host = parts_iter.next().ok_or(ConnectError::InvalidInput)?;
|
|
||||||
let port_str = parts_iter.next().unwrap_or("");
|
|
||||||
let port = port_str
|
|
||||||
.parse::<u16>()
|
|
||||||
.map_err(|_| ConnectError::InvalidInput)?;
|
|
||||||
Ok(Connect {
|
|
||||||
req: (host.to_owned(), port),
|
|
||||||
addr: None,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: Address> Connect<T> {
|
impl<T: Address> Connect<T> {
|
||||||
/// Create new `Connect` instance.
|
/// Create `Connect` instance by spliting the string by ':' and convert the second part to u16
|
||||||
pub fn with(req: T) -> Connect<T> {
|
pub fn new(req: T) -> Connect<T> {
|
||||||
Connect { req, addr: None }
|
let (_, port) = parse(req.host());
|
||||||
|
Connect {
|
||||||
|
req,
|
||||||
|
port: port.unwrap_or(0),
|
||||||
|
addr: None,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create new `Connect` instance from host and address. Connector skips name resolution stage for such connect messages.
|
/// Create new `Connect` instance from host and address. Connector skips name resolution stage for such connect messages.
|
||||||
pub fn with_address(req: T, addr: SocketAddr) -> Connect<T> {
|
pub fn with(req: T, addr: SocketAddr) -> Connect<T> {
|
||||||
Connect {
|
Connect {
|
||||||
req,
|
req,
|
||||||
|
port: 0,
|
||||||
addr: Some(Either::Left(addr)),
|
addr: Some(Either::Left(addr)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -105,7 +68,13 @@ impl<T: Address> Connect<T> {
|
|||||||
|
|
||||||
/// Port of the request
|
/// Port of the request
|
||||||
pub fn port(&self) -> u16 {
|
pub fn port(&self) -> u16 {
|
||||||
self.req.port()
|
self.req.port().unwrap_or(self.port)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Address> From<T> for Connect<T> {
|
||||||
|
fn from(addr: T) -> Self {
|
||||||
|
Connect::new(addr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -115,6 +84,20 @@ impl<T: Address> fmt::Display for Connect<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn parse(host: &str) -> (&str, Option<u16>) {
|
||||||
|
let mut parts_iter = host.splitn(2, ':');
|
||||||
|
if let Some(host) = parts_iter.next() {
|
||||||
|
let port_str = parts_iter.next().unwrap_or("");
|
||||||
|
if let Ok(port) = port_str.parse::<u16>() {
|
||||||
|
(host, Some(port))
|
||||||
|
} else {
|
||||||
|
(host, None)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
(host, None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct Connection<T, U> {
|
pub struct Connection<T, U> {
|
||||||
io: U,
|
io: U,
|
||||||
req: T,
|
req: T,
|
||||||
|
@ -66,10 +66,11 @@ impl<T: Address> Service for Connector<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn call(&mut self, req: Connect<T>) -> Self::Future {
|
fn call(&mut self, req: Connect<T>) -> Self::Future {
|
||||||
let Connect { req, addr } = req;
|
let port = req.port();
|
||||||
|
let Connect { req, addr, .. } = req;
|
||||||
|
|
||||||
if let Some(addr) = addr {
|
if let Some(addr) = addr {
|
||||||
Either::A(ConnectorResponse::new(req, addr))
|
Either::A(ConnectorResponse::new(req, port, addr))
|
||||||
} else {
|
} else {
|
||||||
error!("TCP connector: got unresolved address");
|
error!("TCP connector: got unresolved address");
|
||||||
Either::B(err(ConnectError::Unresolverd))
|
Either::B(err(ConnectError::Unresolverd))
|
||||||
@ -81,6 +82,7 @@ impl<T: Address> Service for Connector<T> {
|
|||||||
/// Tcp stream connector response future
|
/// Tcp stream connector response future
|
||||||
pub struct ConnectorResponse<T> {
|
pub struct ConnectorResponse<T> {
|
||||||
req: Option<T>,
|
req: Option<T>,
|
||||||
|
port: u16,
|
||||||
addrs: Option<VecDeque<SocketAddr>>,
|
addrs: Option<VecDeque<SocketAddr>>,
|
||||||
stream: Option<ConnectFuture>,
|
stream: Option<ConnectFuture>,
|
||||||
}
|
}
|
||||||
@ -88,18 +90,25 @@ pub struct ConnectorResponse<T> {
|
|||||||
impl<T: Address> ConnectorResponse<T> {
|
impl<T: Address> ConnectorResponse<T> {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
req: T,
|
req: T,
|
||||||
|
port: u16,
|
||||||
addr: either::Either<SocketAddr, VecDeque<SocketAddr>>,
|
addr: either::Either<SocketAddr, VecDeque<SocketAddr>>,
|
||||||
) -> ConnectorResponse<T> {
|
) -> ConnectorResponse<T> {
|
||||||
trace!("TCP connector - connecting to {:?}", req.host());
|
trace!(
|
||||||
|
"TCP connector - connecting to {:?} port:{}",
|
||||||
|
req.host(),
|
||||||
|
port
|
||||||
|
);
|
||||||
|
|
||||||
match addr {
|
match addr {
|
||||||
either::Either::Left(addr) => ConnectorResponse {
|
either::Either::Left(addr) => ConnectorResponse {
|
||||||
req: Some(req),
|
req: Some(req),
|
||||||
|
port,
|
||||||
addrs: None,
|
addrs: None,
|
||||||
stream: Some(TcpStream::connect(&addr)),
|
stream: Some(TcpStream::connect(&addr)),
|
||||||
},
|
},
|
||||||
either::Either::Right(addrs) => ConnectorResponse {
|
either::Either::Right(addrs) => ConnectorResponse {
|
||||||
req: Some(req),
|
req: Some(req),
|
||||||
|
port,
|
||||||
addrs: Some(addrs),
|
addrs: Some(addrs),
|
||||||
stream: None,
|
stream: None,
|
||||||
},
|
},
|
||||||
@ -129,7 +138,7 @@ impl<T: Address> Future for ConnectorResponse<T> {
|
|||||||
trace!(
|
trace!(
|
||||||
"TCP connector - failed to connect to connecting to {:?} port: {}",
|
"TCP connector - failed to connect to connecting to {:?} port: {}",
|
||||||
self.req.as_ref().unwrap().host(),
|
self.req.as_ref().unwrap().host(),
|
||||||
self.req.as_ref().unwrap().port(),
|
self.port,
|
||||||
);
|
);
|
||||||
if self.addrs.as_ref().unwrap().is_empty() {
|
if self.addrs.as_ref().unwrap().is_empty() {
|
||||||
return Err(err.into());
|
return Err(err.into());
|
||||||
@ -145,49 +154,3 @@ impl<T: Address> Future for ConnectorResponse<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// #[derive(Clone)]
|
|
||||||
// pub struct DefaultConnector(Connector);
|
|
||||||
|
|
||||||
// impl Default for DefaultConnector {
|
|
||||||
// fn default() -> Self {
|
|
||||||
// DefaultConnector(Connector::default())
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// impl DefaultConnector {
|
|
||||||
// pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self {
|
|
||||||
// DefaultConnector(Connector::new(cfg, opts))
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// impl Service for DefaultConnector {
|
|
||||||
// type Request = Connect;
|
|
||||||
// type Response = TcpStream;
|
|
||||||
// type Error = ConnectorError;
|
|
||||||
// type Future = DefaultConnectorFuture;
|
|
||||||
|
|
||||||
// fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
|
||||||
// self.0.poll_ready()
|
|
||||||
// }
|
|
||||||
|
|
||||||
// fn call(&mut self, req: Connect) -> Self::Future {
|
|
||||||
// DefaultConnectorFuture {
|
|
||||||
// fut: self.0.call(req),
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// #[doc(hidden)]
|
|
||||||
// pub struct DefaultConnectorFuture {
|
|
||||||
// fut: Either<ConnectorFuture, ConnectorTcpFuture>,
|
|
||||||
// }
|
|
||||||
|
|
||||||
// impl Future for DefaultConnectorFuture {
|
|
||||||
// type Item = TcpStream;
|
|
||||||
// type Error = ConnectorError;
|
|
||||||
|
|
||||||
// fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
|
||||||
// Ok(Async::Ready(try_ready!(self.fut.poll()).1))
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
@ -14,7 +14,7 @@ mod error;
|
|||||||
mod resolver;
|
mod resolver;
|
||||||
pub mod ssl;
|
pub mod ssl;
|
||||||
|
|
||||||
pub use trust_dns_resolver::error::ResolveError;
|
pub use trust_dns_resolver::{error::ResolveError, AsyncResolver};
|
||||||
|
|
||||||
pub use self::connect::{Address, Connect, Connection};
|
pub use self::connect::{Address, Connect, Connection};
|
||||||
pub use self::connector::{Connector, ConnectorFactory};
|
pub use self::connector::{Connector, ConnectorFactory};
|
||||||
@ -24,34 +24,51 @@ pub use self::resolver::{Resolver, ResolverFactory};
|
|||||||
use actix_service::{NewService, Service, ServiceExt};
|
use actix_service::{NewService, Service, ServiceExt};
|
||||||
use tokio_tcp::TcpStream;
|
use tokio_tcp::TcpStream;
|
||||||
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
|
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
|
||||||
|
use trust_dns_resolver::system_conf::read_system_conf;
|
||||||
|
|
||||||
|
pub fn start_resolver(cfg: ResolverConfig, opts: ResolverOpts) -> AsyncResolver {
|
||||||
|
let (resolver, bg) = AsyncResolver::new(cfg, opts);
|
||||||
|
tokio_current_thread::spawn(bg);
|
||||||
|
resolver
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn start_default_resolver() -> AsyncResolver {
|
||||||
|
let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() {
|
||||||
|
(cfg, opts)
|
||||||
|
} else {
|
||||||
|
(ResolverConfig::default(), ResolverOpts::default())
|
||||||
|
};
|
||||||
|
|
||||||
|
let (resolver, bg) = AsyncResolver::new(cfg, opts);
|
||||||
|
tokio_current_thread::spawn(bg);
|
||||||
|
resolver
|
||||||
|
}
|
||||||
|
|
||||||
/// Create tcp connector service
|
/// Create tcp connector service
|
||||||
pub fn new_connector<T: Address>(
|
pub fn new_connector<T: Address>(
|
||||||
cfg: ResolverConfig,
|
resolver: AsyncResolver,
|
||||||
opts: ResolverOpts,
|
|
||||||
) -> impl Service<Request = Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError>
|
) -> impl Service<Request = Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError>
|
||||||
+ Clone {
|
+ Clone {
|
||||||
Resolver::new(cfg, opts).and_then(Connector::new())
|
Resolver::new(resolver).and_then(Connector::new())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create tcp connector service
|
/// Create tcp connector service
|
||||||
pub fn new_connector_factory<T: Address>(
|
pub fn new_connector_factory<T: Address>(
|
||||||
cfg: ResolverConfig,
|
resolver: AsyncResolver,
|
||||||
opts: ResolverOpts,
|
|
||||||
) -> impl NewService<
|
) -> impl NewService<
|
||||||
Request = Connect<T>,
|
Request = Connect<T>,
|
||||||
Response = Connection<T, TcpStream>,
|
Response = Connection<T, TcpStream>,
|
||||||
Error = ConnectError,
|
Error = ConnectError,
|
||||||
InitError = (),
|
InitError = (),
|
||||||
> + Clone {
|
> + Clone {
|
||||||
ResolverFactory::new(cfg, opts).and_then(ConnectorFactory::new())
|
ResolverFactory::new(resolver).and_then(ConnectorFactory::new())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create connector service with default parameters
|
/// Create connector service with default parameters
|
||||||
pub fn default_connector<T: Address>(
|
pub fn default_connector<T: Address>(
|
||||||
) -> impl Service<Request = Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError>
|
) -> impl Service<Request = Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError>
|
||||||
+ Clone {
|
+ Clone {
|
||||||
Resolver::default().and_then(Connector::new())
|
Resolver::new(start_default_resolver()).and_then(Connector::new())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create connector service factory with default parameters
|
/// Create connector service factory with default parameters
|
||||||
@ -61,5 +78,5 @@ pub fn default_connector_factory<T: Address>() -> impl NewService<
|
|||||||
Error = ConnectError,
|
Error = ConnectError,
|
||||||
InitError = (),
|
InitError = (),
|
||||||
> + Clone {
|
> + Clone {
|
||||||
ResolverFactory::default().and_then(ConnectorFactory::new())
|
ResolverFactory::new(start_default_resolver()).and_then(ConnectorFactory::new())
|
||||||
}
|
}
|
||||||
|
@ -5,9 +5,7 @@ use std::net::SocketAddr;
|
|||||||
use actix_service::{NewService, Service};
|
use actix_service::{NewService, Service};
|
||||||
use futures::future::{ok, Either, FutureResult};
|
use futures::future::{ok, Either, FutureResult};
|
||||||
use futures::{Async, Future, Poll};
|
use futures::{Async, Future, Poll};
|
||||||
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
|
|
||||||
use trust_dns_resolver::lookup_ip::LookupIpFuture;
|
use trust_dns_resolver::lookup_ip::LookupIpFuture;
|
||||||
use trust_dns_resolver::system_conf::read_system_conf;
|
|
||||||
use trust_dns_resolver::{AsyncResolver, Background};
|
use trust_dns_resolver::{AsyncResolver, Background};
|
||||||
|
|
||||||
use crate::connect::{Address, Connect};
|
use crate::connect::{Address, Connect};
|
||||||
@ -19,28 +17,18 @@ pub struct ResolverFactory<T> {
|
|||||||
_t: PhantomData<T>,
|
_t: PhantomData<T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Default for ResolverFactory<T> {
|
|
||||||
fn default() -> Self {
|
|
||||||
let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() {
|
|
||||||
(cfg, opts)
|
|
||||||
} else {
|
|
||||||
(ResolverConfig::default(), ResolverOpts::default())
|
|
||||||
};
|
|
||||||
|
|
||||||
ResolverFactory::new(cfg, opts)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> ResolverFactory<T> {
|
impl<T> ResolverFactory<T> {
|
||||||
/// Create new resolver instance with custom configuration and options.
|
/// Create new resolver instance with custom configuration and options.
|
||||||
pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self {
|
pub fn new(resolver: AsyncResolver) -> Self {
|
||||||
let (resolver, bg) = AsyncResolver::new(cfg, opts);
|
|
||||||
tokio_current_thread::spawn(bg);
|
|
||||||
ResolverFactory {
|
ResolverFactory {
|
||||||
resolver,
|
resolver,
|
||||||
_t: PhantomData,
|
_t: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn resolver(&self) -> &AsyncResolver {
|
||||||
|
&self.resolver
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Clone for ResolverFactory<T> {
|
impl<T> Clone for ResolverFactory<T> {
|
||||||
@ -74,23 +62,9 @@ pub struct Resolver<T> {
|
|||||||
_t: PhantomData<T>,
|
_t: PhantomData<T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Default for Resolver<T> {
|
|
||||||
fn default() -> Self {
|
|
||||||
let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() {
|
|
||||||
(cfg, opts)
|
|
||||||
} else {
|
|
||||||
(ResolverConfig::default(), ResolverOpts::default())
|
|
||||||
};
|
|
||||||
|
|
||||||
Resolver::new(cfg, opts)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Resolver<T> {
|
impl<T> Resolver<T> {
|
||||||
/// Create new resolver instance with custom configuration and options.
|
/// Create new resolver instance with custom configuration and options.
|
||||||
pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self {
|
pub fn new(resolver: AsyncResolver) -> Self {
|
||||||
let (resolver, bg) = AsyncResolver::new(cfg, opts);
|
|
||||||
tokio_current_thread::spawn(bg);
|
|
||||||
Resolver {
|
Resolver {
|
||||||
resolver,
|
resolver,
|
||||||
_t: PhantomData,
|
_t: PhantomData,
|
||||||
@ -136,13 +110,19 @@ impl<T: Address> Service for Resolver<T> {
|
|||||||
/// Resolver future
|
/// Resolver future
|
||||||
pub struct ResolverFuture<T: Address> {
|
pub struct ResolverFuture<T: Address> {
|
||||||
req: Option<Connect<T>>,
|
req: Option<Connect<T>>,
|
||||||
lookup: Option<Background<LookupIpFuture>>,
|
lookup: Background<LookupIpFuture>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Address> ResolverFuture<T> {
|
impl<T: Address> ResolverFuture<T> {
|
||||||
pub fn new(req: Connect<T>, resolver: &AsyncResolver) -> Self {
|
pub fn new(req: Connect<T>, resolver: &AsyncResolver) -> Self {
|
||||||
|
let lookup = if let Some(host) = req.host().splitn(2, ':').next() {
|
||||||
|
resolver.lookup_ip(host)
|
||||||
|
} else {
|
||||||
|
resolver.lookup_ip(req.host())
|
||||||
|
};
|
||||||
|
|
||||||
ResolverFuture {
|
ResolverFuture {
|
||||||
lookup: Some(resolver.lookup_ip(req.host())),
|
lookup,
|
||||||
req: Some(req),
|
req: Some(req),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -153,7 +133,7 @@ impl<T: Address> Future for ResolverFuture<T> {
|
|||||||
type Error = ConnectError;
|
type Error = ConnectError;
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
match self.lookup.as_mut().unwrap().poll().map_err(|e| {
|
match self.lookup.poll().map_err(|e| {
|
||||||
trace!(
|
trace!(
|
||||||
"DNS resolver: failed to resolve host {:?} err: {}",
|
"DNS resolver: failed to resolve host {:?} err: {}",
|
||||||
self.req.as_ref().unwrap().host(),
|
self.req.as_ref().unwrap().host(),
|
||||||
|
83
actix-connect/tests/test_connect.rs
Normal file
83
actix-connect/tests/test_connect.rs
Normal file
@ -0,0 +1,83 @@
|
|||||||
|
use actix_codec::{BytesCodec, Framed};
|
||||||
|
use actix_server_config::Io;
|
||||||
|
use actix_service::{fn_service, NewService, Service};
|
||||||
|
use actix_test_server::TestServer;
|
||||||
|
use bytes::Bytes;
|
||||||
|
use futures::{future::lazy, Future, Sink};
|
||||||
|
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
|
||||||
|
|
||||||
|
use actix_connect::{default_connector, Connect};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_string() {
|
||||||
|
let mut srv = TestServer::with(|| {
|
||||||
|
fn_service(|io: Io<tokio_tcp::TcpStream>| {
|
||||||
|
Framed::new(io.into_parts().0, BytesCodec)
|
||||||
|
.send(Bytes::from_static(b"test"))
|
||||||
|
.then(|_| Ok::<_, ()>(()))
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut conn = srv
|
||||||
|
.block_on(lazy(|| Ok::<_, ()>(default_connector())))
|
||||||
|
.unwrap();
|
||||||
|
let addr = format!("localhost:{}", srv.port());
|
||||||
|
let con = srv.block_on(conn.call(addr.into())).unwrap();
|
||||||
|
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_static_str() {
|
||||||
|
let mut srv = TestServer::with(|| {
|
||||||
|
fn_service(|io: Io<tokio_tcp::TcpStream>| {
|
||||||
|
Framed::new(io.into_parts().0, BytesCodec)
|
||||||
|
.send(Bytes::from_static(b"test"))
|
||||||
|
.then(|_| Ok::<_, ()>(()))
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
let resolver = srv
|
||||||
|
.block_on(lazy(
|
||||||
|
|| Ok::<_, ()>(actix_connect::start_default_resolver()),
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
let mut conn = srv
|
||||||
|
.block_on(lazy(|| Ok::<_, ()>(actix_connect::new_connector(resolver))))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let con = srv
|
||||||
|
.block_on(conn.call(Connect::with("10", srv.addr())))
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_new_service() {
|
||||||
|
let mut srv = TestServer::with(|| {
|
||||||
|
fn_service(|io: Io<tokio_tcp::TcpStream>| {
|
||||||
|
Framed::new(io.into_parts().0, BytesCodec)
|
||||||
|
.send(Bytes::from_static(b"test"))
|
||||||
|
.then(|_| Ok::<_, ()>(()))
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
let resolver = srv
|
||||||
|
.block_on(lazy(|| {
|
||||||
|
Ok::<_, ()>(actix_connect::start_resolver(
|
||||||
|
ResolverConfig::default(),
|
||||||
|
ResolverOpts::default(),
|
||||||
|
))
|
||||||
|
}))
|
||||||
|
.unwrap();
|
||||||
|
let factory = srv
|
||||||
|
.block_on(lazy(|| {
|
||||||
|
Ok::<_, ()>(actix_connect::new_connector_factory(resolver))
|
||||||
|
}))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let mut conn = srv.block_on(factory.new_service(&())).unwrap();
|
||||||
|
let con = srv
|
||||||
|
.block_on(conn.call(Connect::with("10", srv.addr())))
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
||||||
|
}
|
@ -35,6 +35,7 @@ rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots"]
|
|||||||
[dependencies]
|
[dependencies]
|
||||||
actix-rt = "0.2.1"
|
actix-rt = "0.2.1"
|
||||||
actix-server = "0.4.0"
|
actix-server = "0.4.0"
|
||||||
|
actix-server-config = "0.1.0"
|
||||||
|
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
net2 = "0.2"
|
net2 = "0.2"
|
||||||
|
@ -4,6 +4,7 @@ use std::{net, thread};
|
|||||||
|
|
||||||
use actix_rt::{Runtime, System};
|
use actix_rt::{Runtime, System};
|
||||||
use actix_server::{Server, StreamServiceFactory};
|
use actix_server::{Server, StreamServiceFactory};
|
||||||
|
pub use actix_server_config::{Io, ServerConfig};
|
||||||
|
|
||||||
use futures::Future;
|
use futures::Future;
|
||||||
use net2::TcpBuilder;
|
use net2::TcpBuilder;
|
||||||
|
Loading…
Reference in New Issue
Block a user