diff --git a/actix-connect/CHANGES.md b/actix-connect/CHANGES.md index 91a1f8ea..95ac002e 100644 --- a/actix-connect/CHANGES.md +++ b/actix-connect/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.2.5] - 2019-09-05 + +* Add `TcpConnectService` + ## [0.2.4] - 2019-09-02 * Use arbiter's storage for default async resolver diff --git a/actix-connect/Cargo.toml b/actix-connect/Cargo.toml index f81c1c6c..afa57a93 100644 --- a/actix-connect/Cargo.toml +++ b/actix-connect/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-connect" -version = "0.2.4" +version = "0.2.5" authors = ["Nikolay Kim "] description = "Actix Connector - tcp connector service" keywords = ["network", "framework", "async", "futures"] diff --git a/actix-connect/src/lib.rs b/actix-connect/src/lib.rs index ca09d34a..dbeff07c 100644 --- a/actix-connect/src/lib.rs +++ b/actix-connect/src/lib.rs @@ -28,7 +28,7 @@ pub use self::connect::{Address, Connect, Connection}; pub use self::connector::{TcpConnector, TcpConnectorFactory}; pub use self::error::ConnectError; pub use self::resolver::{Resolver, ResolverFactory}; -pub use self::service::{ConnectService, ConnectServiceFactory}; +pub use self::service::{ConnectService, ConnectServiceFactory, TcpConnectService}; use actix_rt::Arbiter; use actix_service::{NewService, Service, ServiceExt}; diff --git a/actix-connect/src/service.rs b/actix-connect/src/service.rs index 55fbc78a..8bd3cf0d 100644 --- a/actix-connect/src/service.rs +++ b/actix-connect/src/service.rs @@ -38,6 +38,14 @@ impl ConnectServiceFactory { resolver: self.resolver.service(), } } + + /// Construct new tcp stream service + pub fn tcp_service(&self) -> TcpConnectService { + TcpConnectService { + tcp: self.tcp.service(), + resolver: self.resolver.service(), + } + } } impl Default for ConnectServiceFactory { @@ -121,3 +129,55 @@ impl Future for ConnectServiceResponse { Ok(Async::NotReady) } } + +#[derive(Clone)] +pub struct TcpConnectService { + tcp: TcpConnector, + resolver: Resolver, +} + +impl Service for TcpConnectService { + type Request = Connect; + type Response = TcpStream; + type Error = ConnectError; + type Future = TcpConnectServiceResponse; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, req: Connect) -> Self::Future { + TcpConnectServiceResponse { + fut1: Some(self.resolver.call(req)), + fut2: None, + tcp: self.tcp.clone(), + } + } +} + +pub struct TcpConnectServiceResponse { + fut1: Option< as Service>::Future>, + fut2: Option< as Service>::Future>, + tcp: TcpConnector, +} + +impl Future for TcpConnectServiceResponse { + type Item = TcpStream; + type Error = ConnectError; + + fn poll(&mut self) -> Poll { + if let Some(ref mut fut) = self.fut1 { + let res = try_ready!(fut.poll()); + let _ = self.fut1.take(); + self.fut2 = Some(self.tcp.call(res)); + } + + if let Some(ref mut fut) = self.fut2 { + if let Async::Ready(conn) = fut.poll()? { + return Ok(Async::Ready(conn.into_parts().0)); + } + } + + Ok(Async::NotReady) + } +}