From 033a8d890cc276ee4ebdbbbd614aa2c20d085c49 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 13 Mar 2019 15:57:33 -0700 Subject: [PATCH] update actix connect --- Cargo.toml | 3 +-- src/client/connect.rs | 7 +++++-- src/client/connector.rs | 32 ++++++++++++++++---------------- src/ws/client/service.rs | 16 ++++++++-------- 4 files changed, 30 insertions(+), 28 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ee9d28ac7..11f532c89 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,8 +40,7 @@ fail = ["failure"] [dependencies] actix-service = "0.3.4" actix-codec = "0.1.1" -#actix-connector = "0.3.0" -actix-connect = { path="../actix-net/actix-connect" } +actix-connect = { git = "https://github.com/actix/actix-net.git" } actix-utils = "0.3.4" actix-server-config = "0.1.0" diff --git a/src/client/connect.rs b/src/client/connect.rs index 43be57703..93626b0a7 100644 --- a/src/client/connect.rs +++ b/src/client/connect.rs @@ -1,3 +1,4 @@ +use actix_connect::Address; use http::uri::Uri; use http::HttpTryFrom; @@ -53,12 +54,14 @@ impl Connect { Ok(()) } } +} - pub(crate) fn host(&self) -> &str { +impl Address for Connect { + fn host(&self) -> &str { &self.uri.host().unwrap() } - pub(crate) fn port(&self) -> u16 { + fn port(&self) -> u16 { if let Some(port) = self.uri.port() { port } else if let Some(scheme) = self.uri.scheme_part() { diff --git a/src/client/connector.rs b/src/client/connector.rs index 1579cd5ef..aaa88abc0 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -3,7 +3,9 @@ use std::marker::PhantomData; use std::time::Duration; use actix_codec::{AsyncRead, AsyncWrite}; -use actix_connect::{default_connector, Stream}; +use actix_connect::{ + default_connector, Connect as TcpConnect, Connection as TcpConnection, +}; use actix_service::{apply_fn, Service, ServiceExt}; use actix_utils::timeout::{TimeoutError, TimeoutService}; use tokio_tcp::TcpStream; @@ -36,8 +38,8 @@ pub struct Connector { impl Connector<(), ()> { pub fn new() -> Connector< impl Service< - Request = actix_connect::Connect, - Response = Stream, + Request = TcpConnect, + Response = TcpConnection, Error = actix_connect::ConnectError, > + Clone, TcpStream, @@ -77,8 +79,8 @@ impl Connector { where U1: AsyncRead + AsyncWrite + fmt::Debug, T1: Service< - Request = actix_connect::Connect, - Response = Stream, + Request = TcpConnect, + Response = TcpConnection, Error = actix_connect::ConnectError, > + Clone, { @@ -99,8 +101,8 @@ impl Connector where U: AsyncRead + AsyncWrite + fmt::Debug + 'static, T: Service< - Request = actix_connect::Connect, - Response = Stream, + Request = TcpConnect, + Response = TcpConnection, Error = actix_connect::ConnectError, > + Clone, { @@ -170,8 +172,10 @@ where { let connector = TimeoutService::new( self.timeout, - self.connector - .map(|stream| (stream.into_parts().0, Protocol::Http1)), + apply_fn(self.connector, |msg: Connect, srv| { + srv.call(actix_connect::Connect::with_request(msg)) + }) + .map(|stream| (stream.into_parts().0, Protocol::Http1)), ) .map_err(|e| match e { TimeoutError::Service(e) => e, @@ -196,7 +200,7 @@ where let ssl_service = TimeoutService::new( self.timeout, apply_fn(self.connector.clone(), |msg: Connect, srv| { - srv.call(actix_connect::Connect::new(msg.host(), msg.port())) + srv.call(actix_connect::Connect::with_request(msg)) }) .map_err(ConnectError::from) .and_then( @@ -226,7 +230,7 @@ where let tcp_service = TimeoutService::new( self.timeout, apply_fn(self.connector.clone(), |msg: Connect, srv| { - srv.call(actix_connect::Connect::new(msg.host(), msg.port())) + srv.call(actix_connect::Connect::with_request(msg)) }) .map_err(ConnectError::from) .map(|stream| (stream.into_parts().0, Protocol::Http1)), @@ -267,11 +271,7 @@ mod connect_impl { pub(crate) struct InnerConnector where Io: AsyncRead + AsyncWrite + 'static, - T: Service< - Request = Connect, - Response = (Connect, Io, Protocol), - Error = ConnectorError, - >, + T: Service, { pub(crate) tcp_pool: ConnectionPool, } diff --git a/src/ws/client/service.rs b/src/ws/client/service.rs index e3781e15f..7be30993b 100644 --- a/src/ws/client/service.rs +++ b/src/ws/client/service.rs @@ -2,7 +2,7 @@ use std::marker::PhantomData; use actix_codec::{AsyncRead, AsyncWrite, Framed}; -use actix_connect::{default_connector, Connect as TcpConnect, ConnectError}; +use actix_connect::{default_connector, Address, Connect as TcpConnect, ConnectError}; use actix_service::{apply_fn, Service}; use base64; use futures::future::{err, Either, FutureResult}; @@ -29,12 +29,12 @@ impl Client<()> { /// Create client with default connector. pub fn default() -> Client< impl Service< - Request = TcpConnect, + Request = TcpConnect<(String, u16)>, Response = impl AsyncRead + AsyncWrite, Error = ConnectError, > + Clone, > { - Client::new(apply_fn(default_connector(), |msg: TcpConnect, srv| { + Client::new(apply_fn(default_connector(), |msg: TcpConnect<_>, srv| { srv.call(msg).map(|stream| stream.into_parts().0) })) } @@ -42,7 +42,7 @@ impl Client<()> { impl Client where - T: Service, + T: Service, Error = ConnectError>, T::Response: AsyncRead + AsyncWrite, { /// Create new websocket's client factory @@ -53,7 +53,7 @@ where impl Clone for Client where - T: Service + Clone, + T: Service, Error = ConnectError> + Clone, T::Response: AsyncRead + AsyncWrite, { fn clone(&self) -> Self { @@ -65,7 +65,7 @@ where impl Service for Client where - T: Service, + T: Service, Error = ConnectError>, T::Response: AsyncRead + AsyncWrite + 'static, T::Future: 'static, { @@ -130,8 +130,8 @@ where ); // prep connection - let connect = TcpConnect::new( - request.uri().host().unwrap(), + let connect = TcpConnect::from_string( + request.uri().host().unwrap().to_string(), request.uri().port().unwrap_or_else(|| proto.port()), );