1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-02-01 00:20:06 +01:00
actix-net/actix-connect/src/connector.rs

167 lines
4.8 KiB
Rust
Raw Normal View History

2018-08-23 20:47:41 -07:00
use std::collections::VecDeque;
2019-03-13 12:40:11 -07:00
use std::net::SocketAddr;
2018-08-23 20:47:41 -07:00
2019-03-13 12:40:11 -07:00
use actix_service::{NewService, Service};
use futures::future::{err, ok, Either, FutureResult};
use futures::{Async, Future, Poll};
2018-08-23 20:47:41 -07:00
use tokio_tcp::{ConnectFuture, TcpStream};
2019-03-13 12:40:11 -07:00
use super::connect::{Connect, Stream};
use super::error::ConnectError;
2018-08-28 16:24:36 -07:00
2019-03-13 12:40:11 -07:00
/// Tcp connector service factory
#[derive(Copy, Clone, Debug)]
pub struct ConnectorFactory;
2018-08-23 20:47:41 -07:00
2019-03-13 12:40:11 -07:00
impl NewService for ConnectorFactory {
type Request = Connect;
type Response = Stream<TcpStream>;
type Error = ConnectError;
type Service = Connector;
type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>;
2018-09-10 19:42:51 -07:00
2019-03-13 12:40:11 -07:00
fn new_service(&self, _: &()) -> Self::Future {
ok(Connector)
2018-11-29 17:17:02 -10:00
}
2018-08-27 20:32:49 -07:00
}
2019-03-13 12:40:11 -07:00
/// Tcp connector service
#[derive(Copy, Clone, Debug)]
pub struct Connector;
2018-08-23 20:47:41 -07:00
2019-03-09 07:27:56 -08:00
impl Service for Connector {
type Request = Connect;
2019-03-13 12:40:11 -07:00
type Response = Stream<TcpStream>;
type Error = ConnectError;
type Future = Either<ConnectorResponse, FutureResult<Self::Response, Self::Error>>;
2018-08-23 20:47:41 -07:00
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
2018-11-29 16:56:15 -10:00
fn call(&mut self, req: Connect) -> Self::Future {
2019-03-13 12:40:11 -07:00
match req {
Connect::Host { .. } => {
error!("TCP connector: got unresolved address");
Either::B(err(ConnectError::Unresolverd))
2018-11-21 08:07:04 -10:00
}
2019-03-13 12:40:11 -07:00
Connect::Addr { host, addr } => Either::A(ConnectorResponse::new(host, addr)),
2018-09-10 19:39:55 -07:00
}
2018-08-23 20:47:41 -07:00
}
}
2018-09-11 14:01:51 -07:00
#[doc(hidden)]
/// Tcp stream connector response future
2019-03-13 12:40:11 -07:00
pub struct ConnectorResponse {
host: Option<String>,
addrs: Option<VecDeque<SocketAddr>>,
2018-08-23 20:47:41 -07:00
stream: Option<ConnectFuture>,
}
2019-03-13 12:40:11 -07:00
impl ConnectorResponse {
pub fn new(
host: String,
addr: either::Either<SocketAddr, VecDeque<SocketAddr>>,
) -> ConnectorResponse {
trace!("TCP connector - connecting to {:?}", host);
match addr {
either::Either::Left(addr) => ConnectorResponse {
host: Some(host),
addrs: None,
stream: Some(TcpStream::connect(&addr)),
},
either::Either::Right(addrs) => ConnectorResponse {
host: Some(host),
addrs: Some(addrs),
stream: None,
},
2018-08-23 20:47:41 -07:00
}
}
}
2019-03-13 12:40:11 -07:00
impl Future for ConnectorResponse {
type Item = Stream<TcpStream>;
type Error = ConnectError;
2018-08-23 20:47:41 -07:00
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// connect
loop {
if let Some(new) = self.stream.as_mut() {
match new.poll() {
2018-08-28 16:24:36 -07:00
Ok(Async::Ready(sock)) => {
2019-03-13 12:40:11 -07:00
let host = self.host.take().unwrap();
trace!(
"TCP connector - successfully connected to connecting to {:?} - {:?}",
host, sock.peer_addr()
);
2019-03-13 14:38:33 -07:00
return Ok(Async::Ready(Stream::new(sock, host)));
2018-08-28 16:24:36 -07:00
}
2018-08-23 20:47:41 -07:00
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => {
2019-03-13 12:40:11 -07:00
trace!(
"TCP connector - failed to connect to connecting to {:?}",
self.host.as_ref().unwrap()
);
if self.addrs.as_ref().unwrap().is_empty() {
return Err(err.into());
2018-08-23 20:47:41 -07:00
}
}
}
}
// try to connect
2019-03-13 12:40:11 -07:00
self.stream = Some(TcpStream::connect(
&self.addrs.as_mut().unwrap().pop_front().unwrap(),
));
2018-10-23 22:26:16 -07:00
}
}
}
2019-03-13 12:40:11 -07:00
// #[derive(Clone)]
// pub struct DefaultConnector(Connector);
// impl Default for DefaultConnector {
// fn default() -> Self {
// DefaultConnector(Connector::default())
// }
// }
// impl DefaultConnector {
// pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self {
// DefaultConnector(Connector::new(cfg, opts))
// }
// }
// impl Service for DefaultConnector {
// type Request = Connect;
// type Response = TcpStream;
// type Error = ConnectorError;
// type Future = DefaultConnectorFuture;
// fn poll_ready(&mut self) -> Poll<(), Self::Error> {
// self.0.poll_ready()
// }
// fn call(&mut self, req: Connect) -> Self::Future {
// DefaultConnectorFuture {
// fut: self.0.call(req),
// }
// }
// }
// #[doc(hidden)]
// pub struct DefaultConnectorFuture {
// fut: Either<ConnectorFuture, ConnectorTcpFuture>,
// }
// impl Future for DefaultConnectorFuture {
// type Item = TcpStream;
// type Error = ConnectorError;
// fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// Ok(Async::Ready(try_ready!(self.fut.poll()).1))
// }
// }