1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-30 18:44:36 +01:00

Remove unused TcpConnectService (#299)

This commit is contained in:
fakeshadow 2021-03-27 14:03:24 -07:00 committed by GitHub
parent bb27bac216
commit 4544562e1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 41 additions and 92 deletions

View File

@ -7,10 +7,13 @@
* Remove `connect::ssl::openssl::OpensslConnectService`. [#297] * Remove `connect::ssl::openssl::OpensslConnectService`. [#297]
* Add `connect::ssl::native_tls` module for native tls support. [#295] * Add `connect::ssl::native_tls` module for native tls support. [#295]
* Rename `accept::{nativetls => native_tls}`. [#295] * Rename `accept::{nativetls => native_tls}`. [#295]
* Remove `connect::TcpConnectService` type. service caller expect a `TcpStream` should use
`connect::ConnectService` instead and call `Connection<T, TcpStream>::into_parts`. [#299]
[#295]: https://github.com/actix/actix-net/pull/295 [#295]: https://github.com/actix/actix-net/pull/295
[#296]: https://github.com/actix/actix-net/pull/296 [#296]: https://github.com/actix/actix-net/pull/296
[#297]: https://github.com/actix/actix-net/pull/297 [#297]: https://github.com/actix/actix-net/pull/297
[#299]: https://github.com/actix/actix-net/pull/299
## 3.0.0-beta.4 - 2021-02-24 ## 3.0.0-beta.4 - 2021-02-24

View File

@ -72,7 +72,7 @@ pub enum TcpConnectorResponse<T> {
port: u16, port: u16,
local_addr: Option<IpAddr>, local_addr: Option<IpAddr>,
addrs: Option<VecDeque<SocketAddr>>, addrs: Option<VecDeque<SocketAddr>>,
stream: Option<ReusableBoxFuture<Result<TcpStream, io::Error>>>, stream: ReusableBoxFuture<Result<TcpStream, io::Error>>,
}, },
Error(Option<ConnectError>), Error(Option<ConnectError>),
} }
@ -103,18 +103,22 @@ impl<T: Address> TcpConnectorResponse<T> {
port, port,
local_addr, local_addr,
addrs: None, addrs: None,
stream: Some(ReusableBoxFuture::new(connect(addr, local_addr))), stream: ReusableBoxFuture::new(connect(addr, local_addr)),
}, },
// when resolver returns multiple socket addr for request they would be popped from // when resolver returns multiple socket addr for request they would be popped from
// front end of queue and returns with the first successful tcp connection. // front end of queue and returns with the first successful tcp connection.
ConnectAddrs::Multi(addrs) => TcpConnectorResponse::Response { ConnectAddrs::Multi(mut addrs) => {
let addr = addrs.pop_front().unwrap();
TcpConnectorResponse::Response {
req: Some(req), req: Some(req),
port, port,
local_addr, local_addr,
addrs: Some(addrs), addrs: Some(addrs),
stream: None, stream: ReusableBoxFuture::new(connect(addr, local_addr)),
}, }
}
} }
} }
} }
@ -133,8 +137,7 @@ impl<T: Address> Future for TcpConnectorResponse<T> {
addrs, addrs,
stream, stream,
} => loop { } => loop {
if let Some(new) = stream.as_mut() { match ready!(stream.poll(cx)) {
match ready!(new.poll(cx)) {
Ok(sock) => { Ok(sock) => {
let req = req.take().unwrap(); let req = req.take().unwrap();
trace!( trace!(
@ -152,21 +155,13 @@ impl<T: Address> Future for TcpConnectorResponse<T> {
port, port,
); );
if addrs.is_none() || addrs.as_ref().unwrap().is_empty() { if let Some(addr) = addrs.as_mut().and_then(|addrs| addrs.pop_front()) {
stream.set(connect(addr, *local_addr));
} else {
return Poll::Ready(Err(ConnectError::Io(err))); return Poll::Ready(Err(ConnectError::Io(err)));
} }
} }
} }
}
// try to connect
let addr = addrs.as_mut().unwrap().pop_front().unwrap();
let fut = connect(addr, *local_addr);
match stream {
Some(rbf) => rbf.set(fut),
None => *stream = Some(ReusableBoxFuture::new(fut)),
}
}, },
} }
} }

View File

@ -26,20 +26,20 @@ pub mod ssl;
mod uri; mod uri;
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_service::{pipeline, pipeline_factory, Service, ServiceFactory}; use actix_service::{Service, ServiceFactory};
pub use self::connect::{Address, Connect, Connection}; pub use self::connect::{Address, Connect, Connection};
pub use self::connector::{TcpConnector, TcpConnectorFactory}; pub use self::connector::{TcpConnector, TcpConnectorFactory};
pub use self::error::ConnectError; pub use self::error::ConnectError;
pub use self::resolve::{Resolve, Resolver, ResolverFactory}; pub use self::resolve::{Resolve, Resolver, ResolverFactory};
pub use self::service::{ConnectService, ConnectServiceFactory, TcpConnectService}; pub use self::service::{ConnectService, ConnectServiceFactory};
/// Create TCP connector service. /// Create TCP connector service.
pub fn new_connector<T: Address + 'static>( pub fn new_connector<T: Address + 'static>(
resolver: Resolver, resolver: Resolver,
) -> impl Service<Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError> + Clone ) -> impl Service<Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError> + Clone
{ {
pipeline(resolver).and_then(TcpConnector) ConnectServiceFactory::new(resolver).service()
} }
/// Create TCP connector service factory. /// Create TCP connector service factory.
@ -52,7 +52,7 @@ pub fn new_connector_factory<T: Address + 'static>(
Error = ConnectError, Error = ConnectError,
InitError = (), InitError = (),
> + Clone { > + Clone {
pipeline_factory(ResolverFactory::new(resolver)).and_then(TcpConnectorFactory) ConnectServiceFactory::new(resolver)
} }
/// Create connector service with default parameters. /// Create connector service with default parameters.

View File

@ -34,14 +34,6 @@ impl ConnectServiceFactory {
resolver: self.resolver.service(), resolver: self.resolver.service(),
} }
} }
/// Construct new tcp stream service
pub fn tcp_service(&self) -> TcpConnectService {
TcpConnectService {
tcp: self.tcp.service(),
resolver: self.resolver.service(),
}
}
} }
impl Clone for ConnectServiceFactory { impl Clone for ConnectServiceFactory {
@ -63,7 +55,7 @@ impl<T: Address> ServiceFactory<Connect<T>> for ConnectServiceFactory {
fn new_service(&self, _: ()) -> Self::Future { fn new_service(&self, _: ()) -> Self::Future {
let service = self.service(); let service = self.service();
Box::pin(async move { Ok(service) }) Box::pin(async { Ok(service) })
} }
} }
@ -135,44 +127,3 @@ impl<T: Address> Future for ConnectServiceResponse<T> {
} }
} }
} }
#[derive(Clone)]
pub struct TcpConnectService {
tcp: TcpConnector,
resolver: Resolver,
}
impl<T: Address> Service<Connect<T>> for TcpConnectService {
type Response = TcpStream;
type Error = ConnectError;
type Future = TcpConnectServiceResponse<T>;
actix_service::always_ready!();
fn call(&self, req: Connect<T>) -> Self::Future {
TcpConnectServiceResponse {
fut: ConnectFuture::Resolve(self.resolver.call(req)),
tcp: self.tcp,
}
}
}
pub struct TcpConnectServiceResponse<T: Address> {
fut: ConnectFuture<T>,
tcp: TcpConnector,
}
impl<T: Address> Future for TcpConnectServiceResponse<T> {
type Output = Result<TcpStream, ConnectError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match ready!(self.fut.poll_connect(cx))? {
ConnectOutput::Resolved(res) => {
self.fut = ConnectFuture::Connect(self.tcp.call(res));
}
ConnectOutput::Connected(conn) => return Poll::Ready(Ok(conn.into_parts().0)),
}
}
}
}