diff --git a/actix-tls/CHANGES.md b/actix-tls/CHANGES.md index 400b1763..067c4fe8 100644 --- a/actix-tls/CHANGES.md +++ b/actix-tls/CHANGES.md @@ -7,10 +7,13 @@ * Remove `connect::ssl::openssl::OpensslConnectService`. [#297] * Add `connect::ssl::native_tls` module for native tls support. [#295] * Rename `accept::{nativetls => native_tls}`. [#295] +* Remove `connect::TcpConnectService` type. service caller expect a `TcpStream` should use + `connect::ConnectService` instead and call `Connection::into_parts`. [#299] [#295]: https://github.com/actix/actix-net/pull/295 [#296]: https://github.com/actix/actix-net/pull/296 [#297]: https://github.com/actix/actix-net/pull/297 +[#299]: https://github.com/actix/actix-net/pull/299 ## 3.0.0-beta.4 - 2021-02-24 diff --git a/actix-tls/src/connect/connector.rs b/actix-tls/src/connect/connector.rs index 8f32270f..9438404e 100755 --- a/actix-tls/src/connect/connector.rs +++ b/actix-tls/src/connect/connector.rs @@ -72,7 +72,7 @@ pub enum TcpConnectorResponse { port: u16, local_addr: Option, addrs: Option>, - stream: Option>>, + stream: ReusableBoxFuture>, }, Error(Option), } @@ -103,18 +103,22 @@ impl TcpConnectorResponse { port, local_addr, addrs: None, - stream: Some(ReusableBoxFuture::new(connect(addr, local_addr))), + stream: ReusableBoxFuture::new(connect(addr, local_addr)), }, // when resolver returns multiple socket addr for request they would be popped from // front end of queue and returns with the first successful tcp connection. - ConnectAddrs::Multi(addrs) => TcpConnectorResponse::Response { - req: Some(req), - port, - local_addr, - addrs: Some(addrs), - stream: None, - }, + ConnectAddrs::Multi(mut addrs) => { + let addr = addrs.pop_front().unwrap(); + + TcpConnectorResponse::Response { + req: Some(req), + port, + local_addr, + addrs: Some(addrs), + stream: ReusableBoxFuture::new(connect(addr, local_addr)), + } + } } } } @@ -133,40 +137,31 @@ impl Future for TcpConnectorResponse { addrs, stream, } => loop { - if let Some(new) = stream.as_mut() { - match ready!(new.poll(cx)) { - Ok(sock) => { - let req = req.take().unwrap(); - trace!( - "TCP connector: successfully connected to {:?} - {:?}", - req.hostname(), - sock.peer_addr() - ); - return Poll::Ready(Ok(Connection::new(sock, req))); - } + match ready!(stream.poll(cx)) { + Ok(sock) => { + let req = req.take().unwrap(); + trace!( + "TCP connector: successfully connected to {:?} - {:?}", + req.hostname(), + sock.peer_addr() + ); + return Poll::Ready(Ok(Connection::new(sock, req))); + } - Err(err) => { - trace!( - "TCP connector: failed to connect to {:?} port: {}", - req.as_ref().unwrap().hostname(), - port, - ); + Err(err) => { + trace!( + "TCP connector: failed to connect to {:?} port: {}", + req.as_ref().unwrap().hostname(), + port, + ); - if addrs.is_none() || addrs.as_ref().unwrap().is_empty() { - return Poll::Ready(Err(ConnectError::Io(err))); - } + if let Some(addr) = addrs.as_mut().and_then(|addrs| addrs.pop_front()) { + stream.set(connect(addr, *local_addr)); + } else { + return Poll::Ready(Err(ConnectError::Io(err))); } } } - - // try to connect - let addr = addrs.as_mut().unwrap().pop_front().unwrap(); - - let fut = connect(addr, *local_addr); - match stream { - Some(rbf) => rbf.set(fut), - None => *stream = Some(ReusableBoxFuture::new(fut)), - } }, } } diff --git a/actix-tls/src/connect/mod.rs b/actix-tls/src/connect/mod.rs index 4010e3cb..ad4f40a3 100644 --- a/actix-tls/src/connect/mod.rs +++ b/actix-tls/src/connect/mod.rs @@ -26,20 +26,20 @@ pub mod ssl; mod uri; use actix_rt::net::TcpStream; -use actix_service::{pipeline, pipeline_factory, Service, ServiceFactory}; +use actix_service::{Service, ServiceFactory}; pub use self::connect::{Address, Connect, Connection}; pub use self::connector::{TcpConnector, TcpConnectorFactory}; pub use self::error::ConnectError; pub use self::resolve::{Resolve, Resolver, ResolverFactory}; -pub use self::service::{ConnectService, ConnectServiceFactory, TcpConnectService}; +pub use self::service::{ConnectService, ConnectServiceFactory}; /// Create TCP connector service. pub fn new_connector( resolver: Resolver, ) -> impl Service, Response = Connection, Error = ConnectError> + Clone { - pipeline(resolver).and_then(TcpConnector) + ConnectServiceFactory::new(resolver).service() } /// Create TCP connector service factory. @@ -52,7 +52,7 @@ pub fn new_connector_factory( Error = ConnectError, InitError = (), > + Clone { - pipeline_factory(ResolverFactory::new(resolver)).and_then(TcpConnectorFactory) + ConnectServiceFactory::new(resolver) } /// Create connector service with default parameters. diff --git a/actix-tls/src/connect/service.rs b/actix-tls/src/connect/service.rs index 98765ca1..9961498e 100755 --- a/actix-tls/src/connect/service.rs +++ b/actix-tls/src/connect/service.rs @@ -34,14 +34,6 @@ impl ConnectServiceFactory { resolver: self.resolver.service(), } } - - /// Construct new tcp stream service - pub fn tcp_service(&self) -> TcpConnectService { - TcpConnectService { - tcp: self.tcp.service(), - resolver: self.resolver.service(), - } - } } impl Clone for ConnectServiceFactory { @@ -63,7 +55,7 @@ impl ServiceFactory> for ConnectServiceFactory { fn new_service(&self, _: ()) -> Self::Future { let service = self.service(); - Box::pin(async move { Ok(service) }) + Box::pin(async { Ok(service) }) } } @@ -135,44 +127,3 @@ impl Future for ConnectServiceResponse { } } } - -#[derive(Clone)] -pub struct TcpConnectService { - tcp: TcpConnector, - resolver: Resolver, -} - -impl Service> for TcpConnectService { - type Response = TcpStream; - type Error = ConnectError; - type Future = TcpConnectServiceResponse; - - actix_service::always_ready!(); - - fn call(&self, req: Connect) -> Self::Future { - TcpConnectServiceResponse { - fut: ConnectFuture::Resolve(self.resolver.call(req)), - tcp: self.tcp, - } - } -} - -pub struct TcpConnectServiceResponse { - fut: ConnectFuture, - tcp: TcpConnector, -} - -impl Future for TcpConnectServiceResponse { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - match ready!(self.fut.poll_connect(cx))? { - ConnectOutput::Resolved(res) => { - self.fut = ConnectFuture::Connect(self.tcp.call(res)); - } - ConnectOutput::Connected(conn) => return Poll::Ready(Ok(conn.into_parts().0)), - } - } - } -}