1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 07:53:00 +01:00

update actix connect

This commit is contained in:
Nikolay Kim 2019-03-13 15:57:33 -07:00
parent 1941aa0217
commit 033a8d890c
4 changed files with 30 additions and 28 deletions

View File

@ -40,8 +40,7 @@ fail = ["failure"]
[dependencies] [dependencies]
actix-service = "0.3.4" actix-service = "0.3.4"
actix-codec = "0.1.1" actix-codec = "0.1.1"
#actix-connector = "0.3.0" actix-connect = { git = "https://github.com/actix/actix-net.git" }
actix-connect = { path="../actix-net/actix-connect" }
actix-utils = "0.3.4" actix-utils = "0.3.4"
actix-server-config = "0.1.0" actix-server-config = "0.1.0"

View File

@ -1,3 +1,4 @@
use actix_connect::Address;
use http::uri::Uri; use http::uri::Uri;
use http::HttpTryFrom; use http::HttpTryFrom;
@ -53,12 +54,14 @@ impl Connect {
Ok(()) Ok(())
} }
} }
}
pub(crate) fn host(&self) -> &str { impl Address for Connect {
fn host(&self) -> &str {
&self.uri.host().unwrap() &self.uri.host().unwrap()
} }
pub(crate) fn port(&self) -> u16 { fn port(&self) -> u16 {
if let Some(port) = self.uri.port() { if let Some(port) = self.uri.port() {
port port
} else if let Some(scheme) = self.uri.scheme_part() { } else if let Some(scheme) = self.uri.scheme_part() {

View File

@ -3,7 +3,9 @@ use std::marker::PhantomData;
use std::time::Duration; use std::time::Duration;
use actix_codec::{AsyncRead, AsyncWrite}; use actix_codec::{AsyncRead, AsyncWrite};
use actix_connect::{default_connector, Stream}; use actix_connect::{
default_connector, Connect as TcpConnect, Connection as TcpConnection,
};
use actix_service::{apply_fn, Service, ServiceExt}; use actix_service::{apply_fn, Service, ServiceExt};
use actix_utils::timeout::{TimeoutError, TimeoutService}; use actix_utils::timeout::{TimeoutError, TimeoutService};
use tokio_tcp::TcpStream; use tokio_tcp::TcpStream;
@ -36,8 +38,8 @@ pub struct Connector<T, U> {
impl Connector<(), ()> { impl Connector<(), ()> {
pub fn new() -> Connector< pub fn new() -> Connector<
impl Service< impl Service<
Request = actix_connect::Connect, Request = TcpConnect<Connect>,
Response = Stream<TcpStream>, Response = TcpConnection<Connect, TcpStream>,
Error = actix_connect::ConnectError, Error = actix_connect::ConnectError,
> + Clone, > + Clone,
TcpStream, TcpStream,
@ -77,8 +79,8 @@ impl<T, U> Connector<T, U> {
where where
U1: AsyncRead + AsyncWrite + fmt::Debug, U1: AsyncRead + AsyncWrite + fmt::Debug,
T1: Service< T1: Service<
Request = actix_connect::Connect, Request = TcpConnect<Connect>,
Response = Stream<U1>, Response = TcpConnection<Connect, U1>,
Error = actix_connect::ConnectError, Error = actix_connect::ConnectError,
> + Clone, > + Clone,
{ {
@ -99,8 +101,8 @@ impl<T, U> Connector<T, U>
where where
U: AsyncRead + AsyncWrite + fmt::Debug + 'static, U: AsyncRead + AsyncWrite + fmt::Debug + 'static,
T: Service< T: Service<
Request = actix_connect::Connect, Request = TcpConnect<Connect>,
Response = Stream<U>, Response = TcpConnection<Connect, U>,
Error = actix_connect::ConnectError, Error = actix_connect::ConnectError,
> + Clone, > + Clone,
{ {
@ -170,7 +172,9 @@ where
{ {
let connector = TimeoutService::new( let connector = TimeoutService::new(
self.timeout, self.timeout,
self.connector apply_fn(self.connector, |msg: Connect, srv| {
srv.call(actix_connect::Connect::with_request(msg))
})
.map(|stream| (stream.into_parts().0, Protocol::Http1)), .map(|stream| (stream.into_parts().0, Protocol::Http1)),
) )
.map_err(|e| match e { .map_err(|e| match e {
@ -196,7 +200,7 @@ where
let ssl_service = TimeoutService::new( let ssl_service = TimeoutService::new(
self.timeout, self.timeout,
apply_fn(self.connector.clone(), |msg: Connect, srv| { apply_fn(self.connector.clone(), |msg: Connect, srv| {
srv.call(actix_connect::Connect::new(msg.host(), msg.port())) srv.call(actix_connect::Connect::with_request(msg))
}) })
.map_err(ConnectError::from) .map_err(ConnectError::from)
.and_then( .and_then(
@ -226,7 +230,7 @@ where
let tcp_service = TimeoutService::new( let tcp_service = TimeoutService::new(
self.timeout, self.timeout,
apply_fn(self.connector.clone(), |msg: Connect, srv| { apply_fn(self.connector.clone(), |msg: Connect, srv| {
srv.call(actix_connect::Connect::new(msg.host(), msg.port())) srv.call(actix_connect::Connect::with_request(msg))
}) })
.map_err(ConnectError::from) .map_err(ConnectError::from)
.map(|stream| (stream.into_parts().0, Protocol::Http1)), .map(|stream| (stream.into_parts().0, Protocol::Http1)),
@ -267,11 +271,7 @@ mod connect_impl {
pub(crate) struct InnerConnector<T, Io> pub(crate) struct InnerConnector<T, Io>
where where
Io: AsyncRead + AsyncWrite + 'static, Io: AsyncRead + AsyncWrite + 'static,
T: Service< T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectorError>,
Request = Connect,
Response = (Connect, Io, Protocol),
Error = ConnectorError,
>,
{ {
pub(crate) tcp_pool: ConnectionPool<T, Io>, pub(crate) tcp_pool: ConnectionPool<T, Io>,
} }

View File

@ -2,7 +2,7 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_connect::{default_connector, Connect as TcpConnect, ConnectError}; use actix_connect::{default_connector, Address, Connect as TcpConnect, ConnectError};
use actix_service::{apply_fn, Service}; use actix_service::{apply_fn, Service};
use base64; use base64;
use futures::future::{err, Either, FutureResult}; use futures::future::{err, Either, FutureResult};
@ -29,12 +29,12 @@ impl Client<()> {
/// Create client with default connector. /// Create client with default connector.
pub fn default() -> Client< pub fn default() -> Client<
impl Service< impl Service<
Request = TcpConnect, Request = TcpConnect<(String, u16)>,
Response = impl AsyncRead + AsyncWrite, Response = impl AsyncRead + AsyncWrite,
Error = ConnectError, Error = ConnectError,
> + Clone, > + Clone,
> { > {
Client::new(apply_fn(default_connector(), |msg: TcpConnect, srv| { Client::new(apply_fn(default_connector(), |msg: TcpConnect<_>, srv| {
srv.call(msg).map(|stream| stream.into_parts().0) srv.call(msg).map(|stream| stream.into_parts().0)
})) }))
} }
@ -42,7 +42,7 @@ impl Client<()> {
impl<T> Client<T> impl<T> Client<T>
where where
T: Service<Request = TcpConnect, Error = ConnectError>, T: Service<Request = TcpConnect<(String, u16)>, Error = ConnectError>,
T::Response: AsyncRead + AsyncWrite, T::Response: AsyncRead + AsyncWrite,
{ {
/// Create new websocket's client factory /// Create new websocket's client factory
@ -53,7 +53,7 @@ where
impl<T> Clone for Client<T> impl<T> Clone for Client<T>
where where
T: Service<Request = TcpConnect, Error = ConnectError> + Clone, T: Service<Request = TcpConnect<(String, u16)>, Error = ConnectError> + Clone,
T::Response: AsyncRead + AsyncWrite, T::Response: AsyncRead + AsyncWrite,
{ {
fn clone(&self) -> Self { fn clone(&self) -> Self {
@ -65,7 +65,7 @@ where
impl<T> Service for Client<T> impl<T> Service for Client<T>
where where
T: Service<Request = TcpConnect, Error = ConnectError>, T: Service<Request = TcpConnect<(String, u16)>, Error = ConnectError>,
T::Response: AsyncRead + AsyncWrite + 'static, T::Response: AsyncRead + AsyncWrite + 'static,
T::Future: 'static, T::Future: 'static,
{ {
@ -130,8 +130,8 @@ where
); );
// prep connection // prep connection
let connect = TcpConnect::new( let connect = TcpConnect::from_string(
request.uri().host().unwrap(), request.uri().host().unwrap().to_string(),
request.uri().port().unwrap_or_else(|| proto.port()), request.uri().port().unwrap_or_else(|| proto.port()),
); );