1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-06-24 22:37:35 +02:00

allow to specify server address for http and ws requests

This commit is contained in:
Nikolay Kim
2019-04-19 18:03:44 -07:00
parent 7292d0b696
commit fc9b14a933
11 changed files with 129 additions and 78 deletions

View File

@ -8,6 +8,8 @@
* Cookie::max_age_time() accepts value in time::Duration
* Allow to specify server address for client connector
## [0.1.0] - 2019-04-16

View File

@ -46,7 +46,7 @@ secure-cookies = ["ring"]
[dependencies]
actix-service = "0.3.6"
actix-codec = "0.1.2"
actix-connect = "0.1.4"
actix-connect = "0.1.5"
actix-utils = "0.3.5"
actix-server-config = "0.1.1"
actix-threadpool = "0.1.0"

View File

@ -14,6 +14,7 @@ use tokio_tcp::TcpStream;
use super::connection::Connection;
use super::error::ConnectError;
use super::pool::{ConnectionPool, Protocol};
use super::Connect;
#[cfg(feature = "ssl")]
use openssl::ssl::SslConnector;
@ -177,15 +178,17 @@ where
/// its combinator chain.
pub fn finish(
self,
) -> impl Service<Request = Uri, Response = impl Connection, Error = ConnectError> + Clone
{
) -> impl Service<Request = Connect, Response = impl Connection, Error = ConnectError>
+ Clone {
#[cfg(not(feature = "ssl"))]
{
let connector = TimeoutService::new(
self.timeout,
apply_fn(self.connector, |msg: Uri, srv| srv.call(msg.into()))
.map_err(ConnectError::from)
.map(|stream| (stream.into_parts().0, Protocol::Http1)),
apply_fn(self.connector, |msg: Connect, srv| {
srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr))
})
.map_err(ConnectError::from)
.map(|stream| (stream.into_parts().0, Protocol::Http1)),
)
.map_err(|e| match e {
TimeoutError::Service(e) => e,
@ -209,26 +212,28 @@ where
let ssl_service = TimeoutService::new(
self.timeout,
apply_fn(self.connector.clone(), |msg: Uri, srv| srv.call(msg.into()))
.map_err(ConnectError::from)
.and_then(
OpensslConnector::service(self.ssl)
.map_err(ConnectError::from)
.map(|stream| {
let sock = stream.into_parts().0;
let h2 = sock
.get_ref()
.ssl()
.selected_alpn_protocol()
.map(|protos| protos.windows(2).any(|w| w == H2))
.unwrap_or(false);
if h2 {
(sock, Protocol::Http2)
} else {
(sock, Protocol::Http1)
}
}),
),
apply_fn(self.connector.clone(), |msg: Connect, srv| {
srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr))
})
.map_err(ConnectError::from)
.and_then(
OpensslConnector::service(self.ssl)
.map_err(ConnectError::from)
.map(|stream| {
let sock = stream.into_parts().0;
let h2 = sock
.get_ref()
.ssl()
.selected_alpn_protocol()
.map(|protos| protos.windows(2).any(|w| w == H2))
.unwrap_or(false);
if h2 {
(sock, Protocol::Http2)
} else {
(sock, Protocol::Http1)
}
}),
),
)
.map_err(|e| match e {
TimeoutError::Service(e) => e,
@ -237,9 +242,11 @@ where
let tcp_service = TimeoutService::new(
self.timeout,
apply_fn(self.connector.clone(), |msg: Uri, srv| srv.call(msg.into()))
.map_err(ConnectError::from)
.map(|stream| (stream.into_parts().0, Protocol::Http1)),
apply_fn(self.connector.clone(), |msg: Connect, srv| {
srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr))
})
.map_err(ConnectError::from)
.map(|stream| (stream.into_parts().0, Protocol::Http1)),
)
.map_err(|e| match e {
TimeoutError::Service(e) => e,
@ -264,15 +271,6 @@ where
}
}
}
#[doc(hidden)]
#[deprecated(since = "0.1.0-alpha4", note = "please use `.finish()` method")]
pub fn service(
self,
) -> impl Service<Request = Uri, Response = impl Connection, Error = ConnectError> + Clone
{
self.finish()
}
}
#[cfg(not(feature = "ssl"))]
@ -286,7 +284,7 @@ mod connect_impl {
pub(crate) struct InnerConnector<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Uri, Response = (Io, Protocol), Error = ConnectError>,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
{
pub(crate) tcp_pool: ConnectionPool<T, Io>,
}
@ -294,7 +292,7 @@ mod connect_impl {
impl<T, Io> Clone for InnerConnector<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Uri, Response = (Io, Protocol), Error = ConnectError>
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
+ Clone,
{
fn clone(&self) -> Self {
@ -307,9 +305,9 @@ mod connect_impl {
impl<T, Io> Service for InnerConnector<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Uri, Response = (Io, Protocol), Error = ConnectError>,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
{
type Request = Uri;
type Request = Connect;
type Response = IoConnection<Io>;
type Error = ConnectError;
type Future = Either<
@ -346,8 +344,8 @@ mod connect_impl {
where
Io1: AsyncRead + AsyncWrite + 'static,
Io2: AsyncRead + AsyncWrite + 'static,
T1: Service<Request = Uri, Response = (Io1, Protocol), Error = ConnectError>,
T2: Service<Request = Uri, Response = (Io2, Protocol), Error = ConnectError>,
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
{
pub(crate) tcp_pool: ConnectionPool<T1, Io1>,
pub(crate) ssl_pool: ConnectionPool<T2, Io2>,
@ -357,9 +355,9 @@ mod connect_impl {
where
Io1: AsyncRead + AsyncWrite + 'static,
Io2: AsyncRead + AsyncWrite + 'static,
T1: Service<Request = Uri, Response = (Io1, Protocol), Error = ConnectError>
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
+ Clone,
T2: Service<Request = Uri, Response = (Io2, Protocol), Error = ConnectError>
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
+ Clone,
{
fn clone(&self) -> Self {
@ -374,10 +372,10 @@ mod connect_impl {
where
Io1: AsyncRead + AsyncWrite + 'static,
Io2: AsyncRead + AsyncWrite + 'static,
T1: Service<Request = Uri, Response = (Io1, Protocol), Error = ConnectError>,
T2: Service<Request = Uri, Response = (Io2, Protocol), Error = ConnectError>,
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
{
type Request = Uri;
type Request = Connect;
type Response = EitherConnection<Io1, Io2>;
type Error = ConnectError;
type Future = Either<
@ -392,8 +390,8 @@ mod connect_impl {
self.tcp_pool.poll_ready()
}
fn call(&mut self, req: Uri) -> Self::Future {
match req.scheme_str() {
fn call(&mut self, req: Connect) -> Self::Future {
match req.uri.scheme_str() {
Some("https") | Some("wss") => {
Either::B(Either::B(InnerConnectorResponseB {
fut: self.ssl_pool.call(req),
@ -411,7 +409,7 @@ mod connect_impl {
pub(crate) struct InnerConnectorResponseA<T, Io1, Io2>
where
Io1: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Uri, Response = (Io1, Protocol), Error = ConnectError>,
T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
{
fut: <ConnectionPool<T, Io1> as Service>::Future,
_t: PhantomData<Io2>,
@ -419,7 +417,7 @@ mod connect_impl {
impl<T, Io1, Io2> Future for InnerConnectorResponseA<T, Io1, Io2>
where
T: Service<Request = Uri, Response = (Io1, Protocol), Error = ConnectError>,
T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
Io1: AsyncRead + AsyncWrite + 'static,
Io2: AsyncRead + AsyncWrite + 'static,
{
@ -437,7 +435,7 @@ mod connect_impl {
pub(crate) struct InnerConnectorResponseB<T, Io1, Io2>
where
Io2: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Uri, Response = (Io2, Protocol), Error = ConnectError>,
T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
{
fut: <ConnectionPool<T, Io2> as Service>::Future,
_t: PhantomData<Io1>,
@ -445,7 +443,7 @@ mod connect_impl {
impl<T, Io1, Io2> Future for InnerConnectorResponseB<T, Io1, Io2>
where
T: Service<Request = Uri, Response = (Io2, Protocol), Error = ConnectError>,
T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
Io1: AsyncRead + AsyncWrite + 'static,
Io2: AsyncRead + AsyncWrite + 'static,
{

View File

@ -1,4 +1,6 @@
//! Http client api
use http::Uri;
mod connection;
mod connector;
mod error;
@ -10,3 +12,9 @@ pub use self::connection::Connection;
pub use self::connector::Connector;
pub use self::error::{ConnectError, InvalidUrl, SendRequestError};
pub use self::pool::Protocol;
#[derive(Clone)]
pub struct Connect {
pub uri: Uri,
pub addr: Option<std::net::SocketAddr>,
}

View File

@ -13,13 +13,14 @@ use futures::unsync::oneshot;
use futures::{Async, Future, Poll};
use h2::client::{handshake, Handshake};
use hashbrown::HashMap;
use http::uri::{Authority, Uri};
use http::uri::Authority;
use indexmap::IndexSet;
use slab::Slab;
use tokio_timer::{sleep, Delay};
use super::connection::{ConnectionType, IoConnection};
use super::error::ConnectError;
use super::Connect;
#[derive(Clone, Copy, PartialEq)]
/// Protocol version
@ -48,7 +49,7 @@ pub(crate) struct ConnectionPool<T, Io: AsyncRead + AsyncWrite + 'static>(
impl<T, Io> ConnectionPool<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Uri, Response = (Io, Protocol), Error = ConnectError>,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
{
pub(crate) fn new(
connector: T,
@ -87,9 +88,9 @@ where
impl<T, Io> Service for ConnectionPool<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Uri, Response = (Io, Protocol), Error = ConnectError>,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
{
type Request = Uri;
type Request = Connect;
type Response = IoConnection<Io>;
type Error = ConnectError;
type Future = Either<
@ -101,8 +102,8 @@ where
self.0.poll_ready()
}
fn call(&mut self, req: Uri) -> Self::Future {
let key = if let Some(authority) = req.authority_part() {
fn call(&mut self, req: Connect) -> Self::Future {
let key = if let Some(authority) = req.uri.authority_part() {
authority.clone().into()
} else {
return Either::A(err(ConnectError::Unresolverd));
@ -292,7 +293,10 @@ pub(crate) struct Inner<Io> {
limit: usize,
acquired: usize,
available: HashMap<Key, VecDeque<AvailableConnection<Io>>>,
waiters: Slab<(Uri, oneshot::Sender<Result<IoConnection<Io>, ConnectError>>)>,
waiters: Slab<(
Connect,
oneshot::Sender<Result<IoConnection<Io>, ConnectError>>,
)>,
waiters_queue: IndexSet<(Key, usize)>,
task: AtomicTask,
}
@ -331,14 +335,14 @@ where
/// connection is not available, wait
fn wait_for(
&mut self,
connect: Uri,
connect: Connect,
) -> (
oneshot::Receiver<Result<IoConnection<Io>, ConnectError>>,
usize,
) {
let (tx, rx) = oneshot::channel();
let key: Key = connect.authority_part().unwrap().clone().into();
let key: Key = connect.uri.authority_part().unwrap().clone().into();
let entry = self.waiters.vacant_entry();
let token = entry.key();
entry.insert((connect, tx));