use actix_service::{NewService, Service}; use futures::future::{ok, FutureResult}; use futures::{try_ready, Async, Future, Poll}; use tokio_tcp::TcpStream; use trust_dns_resolver::AsyncResolver; use crate::connect::{Address, Connect, Connection}; use crate::connector::{TcpConnector, TcpConnectorFactory}; use crate::error::ConnectError; use crate::resolver::{Resolver, ResolverFactory}; pub struct ConnectServiceFactory { tcp: TcpConnectorFactory, resolver: ResolverFactory, } impl ConnectServiceFactory { /// Construct new ConnectService factory pub fn new() -> Self { ConnectServiceFactory { tcp: TcpConnectorFactory::default(), resolver: ResolverFactory::default(), } } /// Construct new connect service with custom dns resolver pub fn with_resolver(resolver: AsyncResolver) -> Self { ConnectServiceFactory { tcp: TcpConnectorFactory::default(), resolver: ResolverFactory::new(resolver), } } /// Construct new service pub fn service(&self) -> ConnectService { ConnectService { tcp: self.tcp.service(), resolver: self.resolver.service(), } } /// Construct new tcp stream service pub fn tcp_service(&self) -> TcpConnectService { TcpConnectService { tcp: self.tcp.service(), resolver: self.resolver.service(), } } } impl Default for ConnectServiceFactory { fn default() -> Self { ConnectServiceFactory { tcp: TcpConnectorFactory::default(), resolver: ResolverFactory::default(), } } } impl Clone for ConnectServiceFactory { fn clone(&self) -> Self { ConnectServiceFactory { tcp: self.tcp.clone(), resolver: self.resolver.clone(), } } } impl NewService for ConnectServiceFactory { type Request = Connect; type Response = Connection; type Error = ConnectError; type Config = (); type Service = ConnectService; type InitError = (); type Future = FutureResult; fn new_service(&self, _: &()) -> Self::Future { ok(self.service()) } } #[derive(Clone)] pub struct ConnectService { tcp: TcpConnector, resolver: Resolver, } impl Service for ConnectService { type Request = Connect; type Response = Connection; type Error = ConnectError; type Future = ConnectServiceResponse; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } fn call(&mut self, req: Connect) -> Self::Future { ConnectServiceResponse { fut1: Some(self.resolver.call(req)), fut2: None, tcp: self.tcp.clone(), } } } pub struct ConnectServiceResponse { fut1: Option< as Service>::Future>, fut2: Option< as Service>::Future>, tcp: TcpConnector, } impl Future for ConnectServiceResponse { type Item = Connection; type Error = ConnectError; fn poll(&mut self) -> Poll { if let Some(ref mut fut) = self.fut1 { let res = try_ready!(fut.poll()); let _ = self.fut1.take(); self.fut2 = Some(self.tcp.call(res)); } if let Some(ref mut fut) = self.fut2 { return fut.poll(); } Ok(Async::NotReady) } } #[derive(Clone)] pub struct TcpConnectService { tcp: TcpConnector, resolver: Resolver, } impl Service for TcpConnectService { type Request = Connect; type Response = TcpStream; type Error = ConnectError; type Future = TcpConnectServiceResponse; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } fn call(&mut self, req: Connect) -> Self::Future { TcpConnectServiceResponse { fut1: Some(self.resolver.call(req)), fut2: None, tcp: self.tcp.clone(), } } } pub struct TcpConnectServiceResponse { fut1: Option< as Service>::Future>, fut2: Option< as Service>::Future>, tcp: TcpConnector, } impl Future for TcpConnectServiceResponse { type Item = TcpStream; type Error = ConnectError; fn poll(&mut self) -> Poll { if let Some(ref mut fut) = self.fut1 { let res = try_ready!(fut.poll()); let _ = self.fut1.take(); self.fut2 = Some(self.tcp.call(res)); } if let Some(ref mut fut) = self.fut2 { if let Async::Ready(conn) = fut.poll()? { return Ok(Async::Ready(conn.into_parts().0)); } } Ok(Async::NotReady) } }