1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-22 23:05:56 +01:00

use Uri as client connect message

This commit is contained in:
Nikolay Kim 2019-03-14 11:52:52 -07:00
parent d2c755bb47
commit b8bfd29d2c
8 changed files with 112 additions and 293 deletions

View File

@ -55,7 +55,7 @@ encoding = "0.2"
futures = "0.1"
hashbrown = "0.1.8"
h2 = "0.1.16"
http = "0.1.8"
http = "0.1.16"
httparse = "1.3"
indexmap = "1.0"
lazy_static = "1.0"

View File

@ -1,78 +0,0 @@
use actix_connect::Address;
use http::uri::Uri;
use http::HttpTryFrom;
use super::error::InvalidUrl;
use super::pool::Key;
#[derive(Debug)]
/// `Connect` type represents a message that can be sent to
/// `Connector` with a connection request.
pub struct Connect {
pub(crate) uri: Uri,
}
impl Connect {
/// Create `Connect` message for specified `Uri`
pub fn new(uri: Uri) -> Connect {
Connect { uri }
}
/// Construct `Uri` instance and create `Connect` message.
pub fn try_from<U>(uri: U) -> Result<Connect, InvalidUrl>
where
Uri: HttpTryFrom<U>,
{
Ok(Connect {
uri: Uri::try_from(uri).map_err(|e| e.into())?,
})
}
pub(crate) fn is_secure(&self) -> bool {
if let Some(scheme) = self.uri.scheme_part() {
scheme.as_str() == "https"
} else {
false
}
}
pub(crate) fn key(&self) -> Key {
self.uri.authority_part().unwrap().clone().into()
}
pub(crate) fn validate(&self) -> Result<(), InvalidUrl> {
if self.uri.host().is_none() {
Err(InvalidUrl::MissingHost)
} else if self.uri.scheme_part().is_none() {
Err(InvalidUrl::MissingScheme)
} else if let Some(scheme) = self.uri.scheme_part() {
match scheme.as_str() {
"http" | "ws" | "https" | "wss" => Ok(()),
_ => Err(InvalidUrl::UnknownScheme),
}
} else {
Ok(())
}
}
}
impl Address for Connect {
fn host(&self) -> &str {
&self.uri.host().unwrap()
}
fn port(&self) -> Option<u16> {
let port = if let Some(port) = self.uri.port() {
port
} else if let Some(scheme) = self.uri.scheme_part() {
match scheme.as_str() {
"http" | "ws" => 80,
"https" | "wss" => 443,
_ => 80,
}
} else {
80
};
Some(port)
}
}

View File

@ -8,9 +8,9 @@ use actix_connect::{
};
use actix_service::{apply_fn, Service, ServiceExt};
use actix_utils::timeout::{TimeoutError, TimeoutService};
use http::Uri;
use tokio_tcp::TcpStream;
use super::connect::Connect;
use super::connection::Connection;
use super::error::ConnectError;
use super::pool::{ConnectionPool, Protocol};
@ -38,8 +38,8 @@ pub struct Connector<T, U> {
impl Connector<(), ()> {
pub fn new() -> Connector<
impl Service<
Request = TcpConnect<Connect>,
Response = TcpConnection<Connect, TcpStream>,
Request = TcpConnect<Uri>,
Response = TcpConnection<Uri, TcpStream>,
Error = actix_connect::ConnectError,
> + Clone,
TcpStream,
@ -79,8 +79,8 @@ impl<T, U> Connector<T, U> {
where
U1: AsyncRead + AsyncWrite + fmt::Debug,
T1: Service<
Request = TcpConnect<Connect>,
Response = TcpConnection<Connect, U1>,
Request = TcpConnect<Uri>,
Response = TcpConnection<Uri, U1>,
Error = actix_connect::ConnectError,
> + Clone,
{
@ -101,8 +101,8 @@ impl<T, U> Connector<T, U>
where
U: AsyncRead + AsyncWrite + fmt::Debug + 'static,
T: Service<
Request = TcpConnect<Connect>,
Response = TcpConnection<Connect, U>,
Request = TcpConnect<Uri>,
Response = TcpConnection<Uri, U>,
Error = actix_connect::ConnectError,
> + Clone,
{
@ -166,16 +166,14 @@ where
/// Finish configuration process and create connector service.
pub fn service(
self,
) -> impl Service<Request = Connect, Response = impl Connection, Error = ConnectError>
+ Clone {
) -> impl Service<Request = Uri, Response = impl Connection, Error = ConnectError> + Clone
{
#[cfg(not(feature = "ssl"))]
{
let connector = TimeoutService::new(
self.timeout,
apply_fn(self.connector, |msg: Connect, srv| {
srv.call(actix_connect::Connect::new(msg))
})
.map(|stream| (stream.into_parts().0, Protocol::Http1)),
apply_fn(self.connector, |msg: Uri, srv| srv.call(msg.into()))
.map(|stream| (stream.into_parts().0, Protocol::Http1)),
)
.map_err(|e| match e {
TimeoutError::Service(e) => e,
@ -199,28 +197,26 @@ where
let ssl_service = TimeoutService::new(
self.timeout,
apply_fn(self.connector.clone(), |msg: Connect, srv| {
srv.call(actix_connect::Connect::new(msg))
})
.map_err(ConnectError::from)
.and_then(
OpensslConnector::service(self.ssl)
.map_err(ConnectError::from)
.map(|stream| {
let sock = stream.into_parts().0;
let h2 = sock
.get_ref()
.ssl()
.selected_alpn_protocol()
.map(|protos| protos.windows(2).any(|w| w == H2))
.unwrap_or(false);
if h2 {
(sock, Protocol::Http2)
} else {
(sock, Protocol::Http1)
}
}),
),
apply_fn(self.connector.clone(), |msg: Uri, srv| srv.call(msg.into()))
.map_err(ConnectError::from)
.and_then(
OpensslConnector::service(self.ssl)
.map_err(ConnectError::from)
.map(|stream| {
let sock = stream.into_parts().0;
let h2 = sock
.get_ref()
.ssl()
.selected_alpn_protocol()
.map(|protos| protos.windows(2).any(|w| w == H2))
.unwrap_or(false);
if h2 {
(sock, Protocol::Http2)
} else {
(sock, Protocol::Http1)
}
}),
),
)
.map_err(|e| match e {
TimeoutError::Service(e) => e,
@ -229,11 +225,9 @@ where
let tcp_service = TimeoutService::new(
self.timeout,
apply_fn(self.connector.clone(), |msg: Connect, srv| {
srv.call(actix_connect::Connect::new(msg))
})
.map_err(ConnectError::from)
.map(|stream| (stream.into_parts().0, Protocol::Http1)),
apply_fn(self.connector.clone(), |msg: Uri, srv| srv.call(msg.into()))
.map_err(ConnectError::from)
.map(|stream| (stream.into_parts().0, Protocol::Http1)),
)
.map_err(|e| match e {
TimeoutError::Service(e) => e,
@ -271,7 +265,7 @@ mod connect_impl {
pub(crate) struct InnerConnector<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectorError>,
T: Service<Request = Uri, Response = (Io, Protocol), Error = ConnectorError>,
{
pub(crate) tcp_pool: ConnectionPool<T, Io>,
}
@ -279,7 +273,7 @@ mod connect_impl {
impl<T, Io> Clone for InnerConnector<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
T: Service<Request = Uri, Response = (Io, Protocol), Error = ConnectError>
+ Clone,
{
fn clone(&self) -> Self {
@ -292,9 +286,9 @@ mod connect_impl {
impl<T, Io> Service for InnerConnector<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectorError>,
T: Service<Request = Uri, Response = (Io, Protocol), Error = ConnectorError>,
{
type Request = Connect;
type Request = Uri;
type Response = IoConnection<Io>;
type Error = ConnectorError;
type Future = Either<
@ -306,13 +300,12 @@ mod connect_impl {
self.tcp_pool.poll_ready()
}
fn call(&mut self, req: Connect) -> Self::Future {
if req.is_secure() {
Either::B(err(ConnectError::SslIsNotSupported))
} else if let Err(e) = req.validate() {
Either::B(err(e))
} else {
Either::A(self.tcp_pool.call(req))
fn call(&mut self, req: Uri) -> Self::Future {
match req.scheme_str() {
Some("https") | Some("wss") => {
Either::B(err(ConnectError::SslIsNotSupported))
}
_ => Either::A(self.tcp_pool.call(req)),
}
}
}
@ -332,8 +325,8 @@ mod connect_impl {
where
Io1: AsyncRead + AsyncWrite + 'static,
Io2: AsyncRead + AsyncWrite + 'static,
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
T1: Service<Request = Uri, Response = (Io1, Protocol), Error = ConnectError>,
T2: Service<Request = Uri, Response = (Io2, Protocol), Error = ConnectError>,
{
pub(crate) tcp_pool: ConnectionPool<T1, Io1>,
pub(crate) ssl_pool: ConnectionPool<T2, Io2>,
@ -343,9 +336,9 @@ mod connect_impl {
where
Io1: AsyncRead + AsyncWrite + 'static,
Io2: AsyncRead + AsyncWrite + 'static,
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
T1: Service<Request = Uri, Response = (Io1, Protocol), Error = ConnectError>
+ Clone,
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
T2: Service<Request = Uri, Response = (Io2, Protocol), Error = ConnectError>
+ Clone,
{
fn clone(&self) -> Self {
@ -360,10 +353,10 @@ mod connect_impl {
where
Io1: AsyncRead + AsyncWrite + 'static,
Io2: AsyncRead + AsyncWrite + 'static,
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
T1: Service<Request = Uri, Response = (Io1, Protocol), Error = ConnectError>,
T2: Service<Request = Uri, Response = (Io2, Protocol), Error = ConnectError>,
{
type Request = Connect;
type Request = Uri;
type Response = EitherConnection<Io1, Io2>;
type Error = ConnectError;
type Future = Either<
@ -378,17 +371,18 @@ mod connect_impl {
self.tcp_pool.poll_ready()
}
fn call(&mut self, req: Connect) -> Self::Future {
if req.is_secure() {
Either::B(Either::B(InnerConnectorResponseB {
fut: self.ssl_pool.call(req),
_t: PhantomData,
}))
} else {
Either::B(Either::A(InnerConnectorResponseA {
fn call(&mut self, req: Uri) -> Self::Future {
match req.scheme_str() {
Some("https") | Some("wss") => {
Either::B(Either::B(InnerConnectorResponseB {
fut: self.ssl_pool.call(req),
_t: PhantomData,
}))
}
_ => Either::B(Either::A(InnerConnectorResponseA {
fut: self.tcp_pool.call(req),
_t: PhantomData,
}))
})),
}
}
}
@ -396,7 +390,7 @@ mod connect_impl {
pub(crate) struct InnerConnectorResponseA<T, Io1, Io2>
where
Io1: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
T: Service<Request = Uri, Response = (Io1, Protocol), Error = ConnectError>,
{
fut: <ConnectionPool<T, Io1> as Service>::Future,
_t: PhantomData<Io2>,
@ -404,7 +398,7 @@ mod connect_impl {
impl<T, Io1, Io2> Future for InnerConnectorResponseA<T, Io1, Io2>
where
T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
T: Service<Request = Uri, Response = (Io1, Protocol), Error = ConnectError>,
Io1: AsyncRead + AsyncWrite + 'static,
Io2: AsyncRead + AsyncWrite + 'static,
{
@ -422,7 +416,7 @@ mod connect_impl {
pub(crate) struct InnerConnectorResponseB<T, Io1, Io2>
where
Io2: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
T: Service<Request = Uri, Response = (Io2, Protocol), Error = ConnectError>,
{
fut: <ConnectionPool<T, Io2> as Service>::Future,
_t: PhantomData<Io1>,
@ -430,7 +424,7 @@ mod connect_impl {
impl<T, Io1, Io2> Future for InnerConnectorResponseB<T, Io1, Io2>
where
T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
T: Service<Request = Uri, Response = (Io2, Protocol), Error = ConnectError>,
Io1: AsyncRead + AsyncWrite + 'static,
Io2: AsyncRead + AsyncWrite + 'static,
{

View File

@ -1,5 +1,4 @@
//! Http client api
mod connect;
mod connection;
mod connector;
mod error;
@ -9,7 +8,6 @@ mod pool;
mod request;
mod response;
pub use self::connect::Connect;
pub use self::connection::Connection;
pub use self::connector::Connector;
pub use self::error::{ConnectError, InvalidUrl, SendRequestError};

View File

@ -7,18 +7,17 @@ use std::time::{Duration, Instant};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::Service;
use bytes::Bytes;
use futures::future::{ok, Either, FutureResult};
use futures::future::{err, ok, Either, FutureResult};
use futures::task::AtomicTask;
use futures::unsync::oneshot;
use futures::{Async, Future, Poll};
use h2::client::{handshake, Handshake};
use hashbrown::HashMap;
use http::uri::Authority;
use http::uri::{Authority, Uri};
use indexmap::IndexSet;
use slab::Slab;
use tokio_timer::{sleep, Delay};
use super::connect::Connect;
use super::connection::{ConnectionType, IoConnection};
use super::error::ConnectError;
@ -48,7 +47,7 @@ pub(crate) struct ConnectionPool<T, Io: AsyncRead + AsyncWrite + 'static>(
impl<T, Io> ConnectionPool<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
T: Service<Request = Uri, Response = (Io, Protocol), Error = ConnectError>,
{
pub(crate) fn new(
connector: T,
@ -87,9 +86,9 @@ where
impl<T, Io> Service for ConnectionPool<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
T: Service<Request = Uri, Response = (Io, Protocol), Error = ConnectError>,
{
type Request = Connect;
type Request = Uri;
type Response = IoConnection<Io>;
type Error = ConnectError;
type Future = Either<
@ -101,8 +100,12 @@ where
self.0.poll_ready()
}
fn call(&mut self, req: Connect) -> Self::Future {
let key = req.key();
fn call(&mut self, req: Uri) -> Self::Future {
let key = if let Some(authority) = req.authority_part() {
authority.clone().into()
} else {
return Either::A(err(ConnectError::Unresolverd));
};
// acquire connection
match self.1.as_ref().borrow_mut().acquire(&key) {
@ -268,110 +271,6 @@ where
}
}
// struct OpenWaitingConnection<F, Io>
// where
// Io: AsyncRead + AsyncWrite + 'static,
// {
// fut: F,
// key: Key,
// h2: Option<Handshake<Io, Bytes>>,
// rx: Option<oneshot::Sender<Result<IoConnection<Io>, ConnectorError>>>,
// inner: Option<Rc<RefCell<Inner<Io>>>>,
// }
// impl<F, Io> OpenWaitingConnection<F, Io>
// where
// F: Future<Item = (Io, Protocol), Error = ConnectorError> + 'static,
// Io: AsyncRead + AsyncWrite + 'static,
// {
// fn spawn(
// key: Key,
// rx: oneshot::Sender<Result<IoConnection<Io>, ConnectorError>>,
// inner: Rc<RefCell<Inner<Io>>>,
// fut: F,
// ) {
// tokio_current_thread::spawn(OpenWaitingConnection {
// key,
// fut,
// h2: None,
// rx: Some(rx),
// inner: Some(inner),
// })
// }
// }
// impl<F, Io> Drop for OpenWaitingConnection<F, Io>
// where
// Io: AsyncRead + AsyncWrite + 'static,
// {
// fn drop(&mut self) {
// if let Some(inner) = self.inner.take() {
// let mut inner = inner.as_ref().borrow_mut();
// inner.release();
// inner.check_availibility();
// }
// }
// }
// impl<F, Io> Future for OpenWaitingConnection<F, Io>
// where
// F: Future<Item = (Io, Protocol), Error = ConnectorError>,
// Io: AsyncRead + AsyncWrite,
// {
// type Item = ();
// type Error = ();
// fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// if let Some(ref mut h2) = self.h2 {
// return match h2.poll() {
// Ok(Async::Ready((snd, connection))) => {
// tokio_current_thread::spawn(connection.map_err(|_| ()));
// let _ = self.rx.take().unwrap().send(Ok(IoConnection::new(
// ConnectionType::H2(snd),
// Instant::now(),
// Some(Acquired(self.key.clone(), self.inner.clone())),
// )));
// Ok(Async::Ready(()))
// }
// Ok(Async::NotReady) => Ok(Async::NotReady),
// Err(e) => {
// let _ = self.inner.take();
// if let Some(rx) = self.rx.take() {
// let _ = rx.send(Err(e.into()));
// }
// Err(())
// }
// };
// }
// match self.fut.poll() {
// Err(err) => {
// let _ = self.inner.take();
// if let Some(rx) = self.rx.take() {
// let _ = rx.send(Err(err));
// }
// Err(())
// }
// Ok(Async::Ready((_, io, proto))) => {
// let _ = self.inner.take();
// if proto == Protocol::Http1 {
// let _ = self.rx.take().unwrap().send(Ok(IoConnection::new(
// ConnectionType::H1(io),
// Instant::now(),
// Some(Acquired(self.key.clone(), self.inner.clone())),
// )));
// } else {
// self.h2 = Some(handshake(io));
// return self.poll();
// }
// Ok(Async::Ready(()))
// }
// Ok(Async::NotReady) => Ok(Async::NotReady),
// }
// }
// }
enum Acquire<T> {
Acquired(ConnectionType<T>, Instant),
Available,
@ -392,10 +291,7 @@ pub(crate) struct Inner<Io> {
limit: usize,
acquired: usize,
available: HashMap<Key, VecDeque<AvailableConnection<Io>>>,
waiters: Slab<(
Connect,
oneshot::Sender<Result<IoConnection<Io>, ConnectError>>,
)>,
waiters: Slab<(Uri, oneshot::Sender<Result<IoConnection<Io>, ConnectError>>)>,
waiters_queue: IndexSet<(Key, usize)>,
task: AtomicTask,
}
@ -434,14 +330,14 @@ where
/// connection is not available, wait
fn wait_for(
&mut self,
connect: Connect,
connect: Uri,
) -> (
oneshot::Receiver<Result<IoConnection<Io>, ConnectError>>,
usize,
) {
let (tx, rx) = oneshot::channel();
let key = connect.key();
let key: Key = connect.authority_part().unwrap().clone().into();
let entry = self.waiters.vacant_entry();
let token = entry.key();
entry.insert((connect, tx));

View File

@ -21,8 +21,8 @@ use crate::http::{
use crate::message::{ConnectionType, Head, RequestHead};
use super::connection::Connection;
use super::error::{ConnectError, InvalidUrl, SendRequestError};
use super::response::ClientResponse;
use super::{Connect, ConnectError, SendRequestError};
/// An HTTP Client Request
///
@ -180,23 +180,32 @@ where
) -> impl Future<Item = ClientResponse, Error = SendRequestError>
where
B: 'static,
T: Service<Request = Connect, Response = I, Error = ConnectError>,
T: Service<Request = Uri, Response = I, Error = ConnectError>,
I: Connection,
{
let Self { head, body } = self;
let connect = Connect::new(head.uri.clone());
if let Err(e) = connect.validate() {
Either::A(err(e.into()))
let uri = head.uri.clone();
// validate uri
if uri.host().is_none() {
Either::A(err(InvalidUrl::MissingHost.into()))
} else if uri.scheme_part().is_none() {
Either::A(err(InvalidUrl::MissingScheme.into()))
} else if let Some(scheme) = uri.scheme_part() {
match scheme.as_str() {
"http" | "ws" | "https" | "wss" => Either::B(
connector
// connect to the host
.call(uri)
.from_err()
// send request
.and_then(move |connection| connection.send_request(head, body)),
),
_ => Either::A(err(InvalidUrl::UnknownScheme.into())),
}
} else {
Either::B(
connector
// connect to the host
.call(connect)
.from_err()
// send request
.and_then(move |connection| connection.send_request(head, body)),
)
Either::A(err(InvalidUrl::UnknownScheme.into()))
}
}
}
@ -529,7 +538,7 @@ impl ClientRequestBuilder {
if !parts.headers.contains_key(header::HOST) {
let mut wrt = BytesMut::with_capacity(host.len() + 5).writer();
let _ = match parts.uri.port() {
let _ = match parts.uri.port_u16() {
None | Some(80) | Some(443) => write!(wrt, "{}", host),
Some(port) => write!(wrt, "{}:{}", host, port),
};

View File

@ -2,7 +2,7 @@
use std::marker::PhantomData;
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_connect::{default_connector, Address, Connect as TcpConnect, ConnectError};
use actix_connect::{default_connector, Connect as TcpConnect, ConnectError};
use actix_service::{apply_fn, Service};
use base64;
use futures::future::{err, Either, FutureResult};
@ -131,7 +131,7 @@ where
// prep connection
let connect = TcpConnect::new(request.uri().host().unwrap().to_string())
.set_port(request.uri().port().unwrap_or_else(|| proto.port()));
.set_port(request.uri().port_u16().unwrap_or_else(|| proto.port()));
let fut = Box::new(
self.connector

View File

@ -5,10 +5,10 @@ use std::{net, thread, time};
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_http::body::MessageBody;
use actix_http::client::{
ClientRequest, ClientRequestBuilder, ClientResponse, Connect, ConnectError,
Connection, Connector, SendRequestError,
ClientRequest, ClientRequestBuilder, ClientResponse, ConnectError, Connection,
Connector, SendRequestError,
};
use actix_http::ws;
use actix_http::{http::Uri, ws};
use actix_rt::{Runtime, System};
use actix_server::{Server, StreamServiceFactory};
use actix_service::Service;
@ -158,8 +158,8 @@ impl TestServerRuntime {
}
fn new_connector(
) -> impl Service<Request = Connect, Response = impl Connection, Error = ConnectError>
+ Clone {
) -> impl Service<Request = Uri, Response = impl Connection, Error = ConnectError> + Clone
{
#[cfg(feature = "ssl")]
{
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
@ -185,8 +185,8 @@ impl TestServerRuntime {
/// Http connector
pub fn connector(
&mut self,
) -> impl Service<Request = Connect, Response = impl Connection, Error = ConnectError>
+ Clone {
) -> impl Service<Request = Uri, Response = impl Connection, Error = ConnectError> + Clone
{
self.execute(|| TestServerRuntime::new_connector())
}