diff --git a/Cargo.toml b/Cargo.toml index 11f532c8..3b9a8498 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,7 +55,7 @@ encoding = "0.2" futures = "0.1" hashbrown = "0.1.8" h2 = "0.1.16" -http = "0.1.8" +http = "0.1.16" httparse = "1.3" indexmap = "1.0" lazy_static = "1.0" diff --git a/src/client/connect.rs b/src/client/connect.rs deleted file mode 100644 index 82e5e45c..00000000 --- a/src/client/connect.rs +++ /dev/null @@ -1,78 +0,0 @@ -use actix_connect::Address; -use http::uri::Uri; -use http::HttpTryFrom; - -use super::error::InvalidUrl; -use super::pool::Key; - -#[derive(Debug)] -/// `Connect` type represents a message that can be sent to -/// `Connector` with a connection request. -pub struct Connect { - pub(crate) uri: Uri, -} - -impl Connect { - /// Create `Connect` message for specified `Uri` - pub fn new(uri: Uri) -> Connect { - Connect { uri } - } - - /// Construct `Uri` instance and create `Connect` message. - pub fn try_from(uri: U) -> Result - where - Uri: HttpTryFrom, - { - Ok(Connect { - uri: Uri::try_from(uri).map_err(|e| e.into())?, - }) - } - - pub(crate) fn is_secure(&self) -> bool { - if let Some(scheme) = self.uri.scheme_part() { - scheme.as_str() == "https" - } else { - false - } - } - - pub(crate) fn key(&self) -> Key { - self.uri.authority_part().unwrap().clone().into() - } - - pub(crate) fn validate(&self) -> Result<(), InvalidUrl> { - if self.uri.host().is_none() { - Err(InvalidUrl::MissingHost) - } else if self.uri.scheme_part().is_none() { - Err(InvalidUrl::MissingScheme) - } else if let Some(scheme) = self.uri.scheme_part() { - match scheme.as_str() { - "http" | "ws" | "https" | "wss" => Ok(()), - _ => Err(InvalidUrl::UnknownScheme), - } - } else { - Ok(()) - } - } -} - -impl Address for Connect { - fn host(&self) -> &str { - &self.uri.host().unwrap() - } - - fn port(&self) -> Option { - let port = if let Some(port) = self.uri.port() { - port - } else if let Some(scheme) = self.uri.scheme_part() { - match scheme.as_str() { - "http" | "ws" => 80, - "https" | "wss" => 443, - _ => 80, - } - } else { - 80 - }; - Some(port) - } -} diff --git a/src/client/connector.rs b/src/client/connector.rs index c764b93c..b8054151 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -8,9 +8,9 @@ use actix_connect::{ }; use actix_service::{apply_fn, Service, ServiceExt}; use actix_utils::timeout::{TimeoutError, TimeoutService}; +use http::Uri; use tokio_tcp::TcpStream; -use super::connect::Connect; use super::connection::Connection; use super::error::ConnectError; use super::pool::{ConnectionPool, Protocol}; @@ -38,8 +38,8 @@ pub struct Connector { impl Connector<(), ()> { pub fn new() -> Connector< impl Service< - Request = TcpConnect, - Response = TcpConnection, + Request = TcpConnect, + Response = TcpConnection, Error = actix_connect::ConnectError, > + Clone, TcpStream, @@ -79,8 +79,8 @@ impl Connector { where U1: AsyncRead + AsyncWrite + fmt::Debug, T1: Service< - Request = TcpConnect, - Response = TcpConnection, + Request = TcpConnect, + Response = TcpConnection, Error = actix_connect::ConnectError, > + Clone, { @@ -101,8 +101,8 @@ impl Connector where U: AsyncRead + AsyncWrite + fmt::Debug + 'static, T: Service< - Request = TcpConnect, - Response = TcpConnection, + Request = TcpConnect, + Response = TcpConnection, Error = actix_connect::ConnectError, > + Clone, { @@ -166,16 +166,14 @@ where /// Finish configuration process and create connector service. pub fn service( self, - ) -> impl Service - + Clone { + ) -> impl Service + Clone + { #[cfg(not(feature = "ssl"))] { let connector = TimeoutService::new( self.timeout, - apply_fn(self.connector, |msg: Connect, srv| { - srv.call(actix_connect::Connect::new(msg)) - }) - .map(|stream| (stream.into_parts().0, Protocol::Http1)), + apply_fn(self.connector, |msg: Uri, srv| srv.call(msg.into())) + .map(|stream| (stream.into_parts().0, Protocol::Http1)), ) .map_err(|e| match e { TimeoutError::Service(e) => e, @@ -199,28 +197,26 @@ where let ssl_service = TimeoutService::new( self.timeout, - apply_fn(self.connector.clone(), |msg: Connect, srv| { - srv.call(actix_connect::Connect::new(msg)) - }) - .map_err(ConnectError::from) - .and_then( - OpensslConnector::service(self.ssl) - .map_err(ConnectError::from) - .map(|stream| { - let sock = stream.into_parts().0; - let h2 = sock - .get_ref() - .ssl() - .selected_alpn_protocol() - .map(|protos| protos.windows(2).any(|w| w == H2)) - .unwrap_or(false); - if h2 { - (sock, Protocol::Http2) - } else { - (sock, Protocol::Http1) - } - }), - ), + apply_fn(self.connector.clone(), |msg: Uri, srv| srv.call(msg.into())) + .map_err(ConnectError::from) + .and_then( + OpensslConnector::service(self.ssl) + .map_err(ConnectError::from) + .map(|stream| { + let sock = stream.into_parts().0; + let h2 = sock + .get_ref() + .ssl() + .selected_alpn_protocol() + .map(|protos| protos.windows(2).any(|w| w == H2)) + .unwrap_or(false); + if h2 { + (sock, Protocol::Http2) + } else { + (sock, Protocol::Http1) + } + }), + ), ) .map_err(|e| match e { TimeoutError::Service(e) => e, @@ -229,11 +225,9 @@ where let tcp_service = TimeoutService::new( self.timeout, - apply_fn(self.connector.clone(), |msg: Connect, srv| { - srv.call(actix_connect::Connect::new(msg)) - }) - .map_err(ConnectError::from) - .map(|stream| (stream.into_parts().0, Protocol::Http1)), + apply_fn(self.connector.clone(), |msg: Uri, srv| srv.call(msg.into())) + .map_err(ConnectError::from) + .map(|stream| (stream.into_parts().0, Protocol::Http1)), ) .map_err(|e| match e { TimeoutError::Service(e) => e, @@ -271,7 +265,7 @@ mod connect_impl { pub(crate) struct InnerConnector where Io: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service, { pub(crate) tcp_pool: ConnectionPool, } @@ -279,7 +273,7 @@ mod connect_impl { impl Clone for InnerConnector where Io: AsyncRead + AsyncWrite + 'static, - T: Service + T: Service + Clone, { fn clone(&self) -> Self { @@ -292,9 +286,9 @@ mod connect_impl { impl Service for InnerConnector where Io: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service, { - type Request = Connect; + type Request = Uri; type Response = IoConnection; type Error = ConnectorError; type Future = Either< @@ -306,13 +300,12 @@ mod connect_impl { self.tcp_pool.poll_ready() } - fn call(&mut self, req: Connect) -> Self::Future { - if req.is_secure() { - Either::B(err(ConnectError::SslIsNotSupported)) - } else if let Err(e) = req.validate() { - Either::B(err(e)) - } else { - Either::A(self.tcp_pool.call(req)) + fn call(&mut self, req: Uri) -> Self::Future { + match req.scheme_str() { + Some("https") | Some("wss") => { + Either::B(err(ConnectError::SslIsNotSupported)) + } + _ => Either::A(self.tcp_pool.call(req)), } } } @@ -332,8 +325,8 @@ mod connect_impl { where Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, - T1: Service, - T2: Service, + T1: Service, + T2: Service, { pub(crate) tcp_pool: ConnectionPool, pub(crate) ssl_pool: ConnectionPool, @@ -343,9 +336,9 @@ mod connect_impl { where Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, - T1: Service + T1: Service + Clone, - T2: Service + T2: Service + Clone, { fn clone(&self) -> Self { @@ -360,10 +353,10 @@ mod connect_impl { where Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, - T1: Service, - T2: Service, + T1: Service, + T2: Service, { - type Request = Connect; + type Request = Uri; type Response = EitherConnection; type Error = ConnectError; type Future = Either< @@ -378,17 +371,18 @@ mod connect_impl { self.tcp_pool.poll_ready() } - fn call(&mut self, req: Connect) -> Self::Future { - if req.is_secure() { - Either::B(Either::B(InnerConnectorResponseB { - fut: self.ssl_pool.call(req), - _t: PhantomData, - })) - } else { - Either::B(Either::A(InnerConnectorResponseA { + fn call(&mut self, req: Uri) -> Self::Future { + match req.scheme_str() { + Some("https") | Some("wss") => { + Either::B(Either::B(InnerConnectorResponseB { + fut: self.ssl_pool.call(req), + _t: PhantomData, + })) + } + _ => Either::B(Either::A(InnerConnectorResponseA { fut: self.tcp_pool.call(req), _t: PhantomData, - })) + })), } } } @@ -396,7 +390,7 @@ mod connect_impl { pub(crate) struct InnerConnectorResponseA where Io1: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service, { fut: as Service>::Future, _t: PhantomData, @@ -404,7 +398,7 @@ mod connect_impl { impl Future for InnerConnectorResponseA where - T: Service, + T: Service, Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, { @@ -422,7 +416,7 @@ mod connect_impl { pub(crate) struct InnerConnectorResponseB where Io2: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service, { fut: as Service>::Future, _t: PhantomData, @@ -430,7 +424,7 @@ mod connect_impl { impl Future for InnerConnectorResponseB where - T: Service, + T: Service, Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, { diff --git a/src/client/mod.rs b/src/client/mod.rs index 0bff97e4..86b1a0cc 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,5 +1,4 @@ //! Http client api -mod connect; mod connection; mod connector; mod error; @@ -9,7 +8,6 @@ mod pool; mod request; mod response; -pub use self::connect::Connect; pub use self::connection::Connection; pub use self::connector::Connector; pub use self::error::{ConnectError, InvalidUrl, SendRequestError}; diff --git a/src/client/pool.rs b/src/client/pool.rs index 214b7a38..a94b1e52 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -7,18 +7,17 @@ use std::time::{Duration, Instant}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_service::Service; use bytes::Bytes; -use futures::future::{ok, Either, FutureResult}; +use futures::future::{err, ok, Either, FutureResult}; use futures::task::AtomicTask; use futures::unsync::oneshot; use futures::{Async, Future, Poll}; use h2::client::{handshake, Handshake}; use hashbrown::HashMap; -use http::uri::Authority; +use http::uri::{Authority, Uri}; use indexmap::IndexSet; use slab::Slab; use tokio_timer::{sleep, Delay}; -use super::connect::Connect; use super::connection::{ConnectionType, IoConnection}; use super::error::ConnectError; @@ -48,7 +47,7 @@ pub(crate) struct ConnectionPool( impl ConnectionPool where Io: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service, { pub(crate) fn new( connector: T, @@ -87,9 +86,9 @@ where impl Service for ConnectionPool where Io: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service, { - type Request = Connect; + type Request = Uri; type Response = IoConnection; type Error = ConnectError; type Future = Either< @@ -101,8 +100,12 @@ where self.0.poll_ready() } - fn call(&mut self, req: Connect) -> Self::Future { - let key = req.key(); + fn call(&mut self, req: Uri) -> Self::Future { + let key = if let Some(authority) = req.authority_part() { + authority.clone().into() + } else { + return Either::A(err(ConnectError::Unresolverd)); + }; // acquire connection match self.1.as_ref().borrow_mut().acquire(&key) { @@ -268,110 +271,6 @@ where } } -// struct OpenWaitingConnection -// where -// Io: AsyncRead + AsyncWrite + 'static, -// { -// fut: F, -// key: Key, -// h2: Option>, -// rx: Option, ConnectorError>>>, -// inner: Option>>>, -// } - -// impl OpenWaitingConnection -// where -// F: Future + 'static, -// Io: AsyncRead + AsyncWrite + 'static, -// { -// fn spawn( -// key: Key, -// rx: oneshot::Sender, ConnectorError>>, -// inner: Rc>>, -// fut: F, -// ) { -// tokio_current_thread::spawn(OpenWaitingConnection { -// key, -// fut, -// h2: None, -// rx: Some(rx), -// inner: Some(inner), -// }) -// } -// } - -// impl Drop for OpenWaitingConnection -// where -// Io: AsyncRead + AsyncWrite + 'static, -// { -// fn drop(&mut self) { -// if let Some(inner) = self.inner.take() { -// let mut inner = inner.as_ref().borrow_mut(); -// inner.release(); -// inner.check_availibility(); -// } -// } -// } - -// impl Future for OpenWaitingConnection -// where -// F: Future, -// Io: AsyncRead + AsyncWrite, -// { -// type Item = (); -// type Error = (); - -// fn poll(&mut self) -> Poll { -// if let Some(ref mut h2) = self.h2 { -// return match h2.poll() { -// Ok(Async::Ready((snd, connection))) => { -// tokio_current_thread::spawn(connection.map_err(|_| ())); -// let _ = self.rx.take().unwrap().send(Ok(IoConnection::new( -// ConnectionType::H2(snd), -// Instant::now(), -// Some(Acquired(self.key.clone(), self.inner.clone())), -// ))); -// Ok(Async::Ready(())) -// } -// Ok(Async::NotReady) => Ok(Async::NotReady), -// Err(e) => { -// let _ = self.inner.take(); -// if let Some(rx) = self.rx.take() { -// let _ = rx.send(Err(e.into())); -// } - -// Err(()) -// } -// }; -// } - -// match self.fut.poll() { -// Err(err) => { -// let _ = self.inner.take(); -// if let Some(rx) = self.rx.take() { -// let _ = rx.send(Err(err)); -// } -// Err(()) -// } -// Ok(Async::Ready((_, io, proto))) => { -// let _ = self.inner.take(); -// if proto == Protocol::Http1 { -// let _ = self.rx.take().unwrap().send(Ok(IoConnection::new( -// ConnectionType::H1(io), -// Instant::now(), -// Some(Acquired(self.key.clone(), self.inner.clone())), -// ))); -// } else { -// self.h2 = Some(handshake(io)); -// return self.poll(); -// } -// Ok(Async::Ready(())) -// } -// Ok(Async::NotReady) => Ok(Async::NotReady), -// } -// } -// } - enum Acquire { Acquired(ConnectionType, Instant), Available, @@ -392,10 +291,7 @@ pub(crate) struct Inner { limit: usize, acquired: usize, available: HashMap>>, - waiters: Slab<( - Connect, - oneshot::Sender, ConnectError>>, - )>, + waiters: Slab<(Uri, oneshot::Sender, ConnectError>>)>, waiters_queue: IndexSet<(Key, usize)>, task: AtomicTask, } @@ -434,14 +330,14 @@ where /// connection is not available, wait fn wait_for( &mut self, - connect: Connect, + connect: Uri, ) -> ( oneshot::Receiver, ConnectError>>, usize, ) { let (tx, rx) = oneshot::channel(); - let key = connect.key(); + let key: Key = connect.authority_part().unwrap().clone().into(); let entry = self.waiters.vacant_entry(); let token = entry.key(); entry.insert((connect, tx)); diff --git a/src/client/request.rs b/src/client/request.rs index 199e13b9..7c7079fb 100644 --- a/src/client/request.rs +++ b/src/client/request.rs @@ -21,8 +21,8 @@ use crate::http::{ use crate::message::{ConnectionType, Head, RequestHead}; use super::connection::Connection; +use super::error::{ConnectError, InvalidUrl, SendRequestError}; use super::response::ClientResponse; -use super::{Connect, ConnectError, SendRequestError}; /// An HTTP Client Request /// @@ -180,23 +180,32 @@ where ) -> impl Future where B: 'static, - T: Service, + T: Service, I: Connection, { let Self { head, body } = self; - let connect = Connect::new(head.uri.clone()); - if let Err(e) = connect.validate() { - Either::A(err(e.into())) + let uri = head.uri.clone(); + + // validate uri + if uri.host().is_none() { + Either::A(err(InvalidUrl::MissingHost.into())) + } else if uri.scheme_part().is_none() { + Either::A(err(InvalidUrl::MissingScheme.into())) + } else if let Some(scheme) = uri.scheme_part() { + match scheme.as_str() { + "http" | "ws" | "https" | "wss" => Either::B( + connector + // connect to the host + .call(uri) + .from_err() + // send request + .and_then(move |connection| connection.send_request(head, body)), + ), + _ => Either::A(err(InvalidUrl::UnknownScheme.into())), + } } else { - Either::B( - connector - // connect to the host - .call(connect) - .from_err() - // send request - .and_then(move |connection| connection.send_request(head, body)), - ) + Either::A(err(InvalidUrl::UnknownScheme.into())) } } } @@ -529,7 +538,7 @@ impl ClientRequestBuilder { if !parts.headers.contains_key(header::HOST) { let mut wrt = BytesMut::with_capacity(host.len() + 5).writer(); - let _ = match parts.uri.port() { + let _ = match parts.uri.port_u16() { None | Some(80) | Some(443) => write!(wrt, "{}", host), Some(port) => write!(wrt, "{}:{}", host, port), }; diff --git a/src/ws/client/service.rs b/src/ws/client/service.rs index 1aa39124..a0a9b203 100644 --- a/src/ws/client/service.rs +++ b/src/ws/client/service.rs @@ -2,7 +2,7 @@ use std::marker::PhantomData; use actix_codec::{AsyncRead, AsyncWrite, Framed}; -use actix_connect::{default_connector, Address, Connect as TcpConnect, ConnectError}; +use actix_connect::{default_connector, Connect as TcpConnect, ConnectError}; use actix_service::{apply_fn, Service}; use base64; use futures::future::{err, Either, FutureResult}; @@ -131,7 +131,7 @@ where // prep connection let connect = TcpConnect::new(request.uri().host().unwrap().to_string()) - .set_port(request.uri().port().unwrap_or_else(|| proto.port())); + .set_port(request.uri().port_u16().unwrap_or_else(|| proto.port())); let fut = Box::new( self.connector diff --git a/test-server/src/lib.rs b/test-server/src/lib.rs index 3bb5feff..26bca787 100644 --- a/test-server/src/lib.rs +++ b/test-server/src/lib.rs @@ -5,10 +5,10 @@ use std::{net, thread, time}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_http::body::MessageBody; use actix_http::client::{ - ClientRequest, ClientRequestBuilder, ClientResponse, Connect, ConnectError, - Connection, Connector, SendRequestError, + ClientRequest, ClientRequestBuilder, ClientResponse, ConnectError, Connection, + Connector, SendRequestError, }; -use actix_http::ws; +use actix_http::{http::Uri, ws}; use actix_rt::{Runtime, System}; use actix_server::{Server, StreamServiceFactory}; use actix_service::Service; @@ -158,8 +158,8 @@ impl TestServerRuntime { } fn new_connector( - ) -> impl Service - + Clone { + ) -> impl Service + Clone + { #[cfg(feature = "ssl")] { use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; @@ -185,8 +185,8 @@ impl TestServerRuntime { /// Http connector pub fn connector( &mut self, - ) -> impl Service - + Clone { + ) -> impl Service + Clone + { self.execute(|| TestServerRuntime::new_connector()) }