From fc9b14a933f49c40fa43f31335913e6e92a701d7 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 19 Apr 2019 18:03:44 -0700 Subject: [PATCH] allow to specify server address for http and ws requests --- actix-http/CHANGES.md | 2 + actix-http/Cargo.toml | 2 +- actix-http/src/client/connector.rs | 106 ++++++++++++++--------------- actix-http/src/client/mod.rs | 8 +++ actix-http/src/client/pool.rs | 22 +++--- awc/CHANGES.md | 6 +- awc/Cargo.toml | 2 +- awc/src/builder.rs | 6 +- awc/src/connect.rs | 24 +++++-- awc/src/request.rs | 15 +++- awc/src/ws.rs | 14 +++- 11 files changed, 129 insertions(+), 78 deletions(-) diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index a82b864b3..fc33ff411 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -8,6 +8,8 @@ * Cookie::max_age_time() accepts value in time::Duration +* Allow to specify server address for client connector + ## [0.1.0] - 2019-04-16 diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index b94e12d87..1f8056b36 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -46,7 +46,7 @@ secure-cookies = ["ring"] [dependencies] actix-service = "0.3.6" actix-codec = "0.1.2" -actix-connect = "0.1.4" +actix-connect = "0.1.5" actix-utils = "0.3.5" actix-server-config = "0.1.1" actix-threadpool = "0.1.0" diff --git a/actix-http/src/client/connector.rs b/actix-http/src/client/connector.rs index ed6207f9f..639afb755 100644 --- a/actix-http/src/client/connector.rs +++ b/actix-http/src/client/connector.rs @@ -14,6 +14,7 @@ use tokio_tcp::TcpStream; use super::connection::Connection; use super::error::ConnectError; use super::pool::{ConnectionPool, Protocol}; +use super::Connect; #[cfg(feature = "ssl")] use openssl::ssl::SslConnector; @@ -177,15 +178,17 @@ where /// its combinator chain. pub fn finish( self, - ) -> impl Service + Clone - { + ) -> impl Service + + Clone { #[cfg(not(feature = "ssl"))] { let connector = TimeoutService::new( self.timeout, - apply_fn(self.connector, |msg: Uri, srv| srv.call(msg.into())) - .map_err(ConnectError::from) - .map(|stream| (stream.into_parts().0, Protocol::Http1)), + apply_fn(self.connector, |msg: Connect, srv| { + srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr)) + }) + .map_err(ConnectError::from) + .map(|stream| (stream.into_parts().0, Protocol::Http1)), ) .map_err(|e| match e { TimeoutError::Service(e) => e, @@ -209,26 +212,28 @@ where let ssl_service = TimeoutService::new( self.timeout, - apply_fn(self.connector.clone(), |msg: Uri, srv| srv.call(msg.into())) - .map_err(ConnectError::from) - .and_then( - OpensslConnector::service(self.ssl) - .map_err(ConnectError::from) - .map(|stream| { - let sock = stream.into_parts().0; - let h2 = sock - .get_ref() - .ssl() - .selected_alpn_protocol() - .map(|protos| protos.windows(2).any(|w| w == H2)) - .unwrap_or(false); - if h2 { - (sock, Protocol::Http2) - } else { - (sock, Protocol::Http1) - } - }), - ), + apply_fn(self.connector.clone(), |msg: Connect, srv| { + srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr)) + }) + .map_err(ConnectError::from) + .and_then( + OpensslConnector::service(self.ssl) + .map_err(ConnectError::from) + .map(|stream| { + let sock = stream.into_parts().0; + let h2 = sock + .get_ref() + .ssl() + .selected_alpn_protocol() + .map(|protos| protos.windows(2).any(|w| w == H2)) + .unwrap_or(false); + if h2 { + (sock, Protocol::Http2) + } else { + (sock, Protocol::Http1) + } + }), + ), ) .map_err(|e| match e { TimeoutError::Service(e) => e, @@ -237,9 +242,11 @@ where let tcp_service = TimeoutService::new( self.timeout, - apply_fn(self.connector.clone(), |msg: Uri, srv| srv.call(msg.into())) - .map_err(ConnectError::from) - .map(|stream| (stream.into_parts().0, Protocol::Http1)), + apply_fn(self.connector.clone(), |msg: Connect, srv| { + srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr)) + }) + .map_err(ConnectError::from) + .map(|stream| (stream.into_parts().0, Protocol::Http1)), ) .map_err(|e| match e { TimeoutError::Service(e) => e, @@ -264,15 +271,6 @@ where } } } - - #[doc(hidden)] - #[deprecated(since = "0.1.0-alpha4", note = "please use `.finish()` method")] - pub fn service( - self, - ) -> impl Service + Clone - { - self.finish() - } } #[cfg(not(feature = "ssl"))] @@ -286,7 +284,7 @@ mod connect_impl { pub(crate) struct InnerConnector where Io: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service, { pub(crate) tcp_pool: ConnectionPool, } @@ -294,7 +292,7 @@ mod connect_impl { impl Clone for InnerConnector where Io: AsyncRead + AsyncWrite + 'static, - T: Service + T: Service + Clone, { fn clone(&self) -> Self { @@ -307,9 +305,9 @@ mod connect_impl { impl Service for InnerConnector where Io: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service, { - type Request = Uri; + type Request = Connect; type Response = IoConnection; type Error = ConnectError; type Future = Either< @@ -346,8 +344,8 @@ mod connect_impl { where Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, - T1: Service, - T2: Service, + T1: Service, + T2: Service, { pub(crate) tcp_pool: ConnectionPool, pub(crate) ssl_pool: ConnectionPool, @@ -357,9 +355,9 @@ mod connect_impl { where Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, - T1: Service + T1: Service + Clone, - T2: Service + T2: Service + Clone, { fn clone(&self) -> Self { @@ -374,10 +372,10 @@ mod connect_impl { where Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, - T1: Service, - T2: Service, + T1: Service, + T2: Service, { - type Request = Uri; + type Request = Connect; type Response = EitherConnection; type Error = ConnectError; type Future = Either< @@ -392,8 +390,8 @@ mod connect_impl { self.tcp_pool.poll_ready() } - fn call(&mut self, req: Uri) -> Self::Future { - match req.scheme_str() { + fn call(&mut self, req: Connect) -> Self::Future { + match req.uri.scheme_str() { Some("https") | Some("wss") => { Either::B(Either::B(InnerConnectorResponseB { fut: self.ssl_pool.call(req), @@ -411,7 +409,7 @@ mod connect_impl { pub(crate) struct InnerConnectorResponseA where Io1: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service, { fut: as Service>::Future, _t: PhantomData, @@ -419,7 +417,7 @@ mod connect_impl { impl Future for InnerConnectorResponseA where - T: Service, + T: Service, Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, { @@ -437,7 +435,7 @@ mod connect_impl { pub(crate) struct InnerConnectorResponseB where Io2: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service, { fut: as Service>::Future, _t: PhantomData, @@ -445,7 +443,7 @@ mod connect_impl { impl Future for InnerConnectorResponseB where - T: Service, + T: Service, Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, { diff --git a/actix-http/src/client/mod.rs b/actix-http/src/client/mod.rs index cf526e25e..1d10117cd 100644 --- a/actix-http/src/client/mod.rs +++ b/actix-http/src/client/mod.rs @@ -1,4 +1,6 @@ //! Http client api +use http::Uri; + mod connection; mod connector; mod error; @@ -10,3 +12,9 @@ pub use self::connection::Connection; pub use self::connector::Connector; pub use self::error::{ConnectError, InvalidUrl, SendRequestError}; pub use self::pool::Protocol; + +#[derive(Clone)] +pub struct Connect { + pub uri: Uri, + pub addr: Option, +} diff --git a/actix-http/src/client/pool.rs b/actix-http/src/client/pool.rs index 68ac6fbc8..7b138dbaa 100644 --- a/actix-http/src/client/pool.rs +++ b/actix-http/src/client/pool.rs @@ -13,13 +13,14 @@ use futures::unsync::oneshot; use futures::{Async, Future, Poll}; use h2::client::{handshake, Handshake}; use hashbrown::HashMap; -use http::uri::{Authority, Uri}; +use http::uri::Authority; use indexmap::IndexSet; use slab::Slab; use tokio_timer::{sleep, Delay}; use super::connection::{ConnectionType, IoConnection}; use super::error::ConnectError; +use super::Connect; #[derive(Clone, Copy, PartialEq)] /// Protocol version @@ -48,7 +49,7 @@ pub(crate) struct ConnectionPool( impl ConnectionPool where Io: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service, { pub(crate) fn new( connector: T, @@ -87,9 +88,9 @@ where impl Service for ConnectionPool where Io: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service, { - type Request = Uri; + type Request = Connect; type Response = IoConnection; type Error = ConnectError; type Future = Either< @@ -101,8 +102,8 @@ where self.0.poll_ready() } - fn call(&mut self, req: Uri) -> Self::Future { - let key = if let Some(authority) = req.authority_part() { + fn call(&mut self, req: Connect) -> Self::Future { + let key = if let Some(authority) = req.uri.authority_part() { authority.clone().into() } else { return Either::A(err(ConnectError::Unresolverd)); @@ -292,7 +293,10 @@ pub(crate) struct Inner { limit: usize, acquired: usize, available: HashMap>>, - waiters: Slab<(Uri, oneshot::Sender, ConnectError>>)>, + waiters: Slab<( + Connect, + oneshot::Sender, ConnectError>>, + )>, waiters_queue: IndexSet<(Key, usize)>, task: AtomicTask, } @@ -331,14 +335,14 @@ where /// connection is not available, wait fn wait_for( &mut self, - connect: Uri, + connect: Connect, ) -> ( oneshot::Receiver, ConnectError>>, usize, ) { let (tx, rx) = oneshot::channel(); - let key: Key = connect.authority_part().unwrap().clone().into(); + let key: Key = connect.uri.authority_part().unwrap().clone().into(); let entry = self.waiters.vacant_entry(); let token = entry.key(); entry.insert((connect, tx)); diff --git a/awc/CHANGES.md b/awc/CHANGES.md index a4f43d292..30fd4a6d2 100644 --- a/awc/CHANGES.md +++ b/awc/CHANGES.md @@ -1,6 +1,10 @@ # Changes -## [0.1.1] - 2019-04-xx +## [0.1.1] - 2019-04-19 + +### Added + +* Allow to specify server address for http and ws requests. ### Changed diff --git a/awc/Cargo.toml b/awc/Cargo.toml index bbc3d9287..b8c4f9898 100644 --- a/awc/Cargo.toml +++ b/awc/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "awc" -version = "0.1.0" +version = "0.1.1" authors = ["Nikolay Kim "] description = "Actix http client." readme = "README.md" diff --git a/awc/src/builder.rs b/awc/src/builder.rs index ddefed439..c460f1357 100644 --- a/awc/src/builder.rs +++ b/awc/src/builder.rs @@ -3,8 +3,8 @@ use std::fmt; use std::rc::Rc; use std::time::Duration; -use actix_http::client::{ConnectError, Connection, Connector}; -use actix_http::http::{header, HeaderMap, HeaderName, HttpTryFrom, Uri}; +use actix_http::client::{Connect, ConnectError, Connection, Connector}; +use actix_http::http::{header, HeaderMap, HeaderName, HttpTryFrom}; use actix_service::Service; use crate::connect::ConnectorWrapper; @@ -40,7 +40,7 @@ impl ClientBuilder { /// Use custom connector service. pub fn connector(mut self, connector: T) -> Self where - T: Service + 'static, + T: Service + 'static, T::Response: Connection, ::Future: 'static, T::Future: 'static, diff --git a/awc/src/connect.rs b/awc/src/connect.rs index bfc9da05f..4b564d777 100644 --- a/awc/src/connect.rs +++ b/awc/src/connect.rs @@ -1,10 +1,12 @@ -use std::{fmt, io}; +use std::{fmt, io, net}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_http::body::Body; -use actix_http::client::{ConnectError, Connection, SendRequestError}; +use actix_http::client::{ + Connect as ClientConnect, ConnectError, Connection, SendRequestError, +}; use actix_http::h1::ClientCodec; -use actix_http::{http, RequestHead, ResponseHead}; +use actix_http::{RequestHead, ResponseHead}; use actix_service::Service; use futures::{Future, Poll}; @@ -17,12 +19,14 @@ pub(crate) trait Connect { &mut self, head: RequestHead, body: Body, + addr: Option, ) -> Box>; /// Send request, returns Response and Framed fn open_tunnel( &mut self, head: RequestHead, + addr: Option, ) -> Box< Future< Item = (ResponseHead, Framed), @@ -33,7 +37,7 @@ pub(crate) trait Connect { impl Connect for ConnectorWrapper where - T: Service, + T: Service, T::Response: Connection, ::Io: 'static, ::Future: 'static, @@ -44,11 +48,15 @@ where &mut self, head: RequestHead, body: Body, + addr: Option, ) -> Box> { Box::new( self.0 // connect to the host - .call(head.uri.clone()) + .call(ClientConnect { + uri: head.uri.clone(), + addr, + }) .from_err() // send request .and_then(move |connection| connection.send_request(head, body)) @@ -59,6 +67,7 @@ where fn open_tunnel( &mut self, head: RequestHead, + addr: Option, ) -> Box< Future< Item = (ResponseHead, Framed), @@ -68,7 +77,10 @@ where Box::new( self.0 // connect to the host - .call(head.uri.clone()) + .call(ClientConnect { + uri: head.uri.clone(), + addr, + }) .from_err() // send request .and_then(move |connection| connection.open_tunnel(head)) diff --git a/awc/src/request.rs b/awc/src/request.rs index a280dfce1..2e6032649 100644 --- a/awc/src/request.rs +++ b/awc/src/request.rs @@ -1,8 +1,8 @@ -use std::fmt; use std::fmt::Write as FmtWrite; use std::io::Write; use std::rc::Rc; use std::time::Duration; +use std::{fmt, net}; use bytes::{BufMut, Bytes, BytesMut}; use futures::future::{err, Either}; @@ -60,6 +60,7 @@ const HTTPS_ENCODING: &str = "gzip, deflate"; pub struct ClientRequest { pub(crate) head: RequestHead, err: Option, + addr: Option, cookies: Option, response_decompress: bool, timeout: Option, @@ -76,6 +77,7 @@ impl ClientRequest { config, head: RequestHead::default(), err: None, + addr: None, cookies: None, timeout: None, response_decompress: true, @@ -97,6 +99,15 @@ impl ClientRequest { self } + /// Set socket address of the server. + /// + /// This address is used for connection. If address is not + /// provided url's host name get resolved. + pub fn address(mut self, addr: net::SocketAddr) -> Self { + self.addr = Some(addr); + self + } + /// Set HTTP method of this request. #[inline] pub fn method(mut self, method: Method) -> Self { @@ -435,7 +446,7 @@ impl ClientRequest { let fut = config .connector .borrow_mut() - .send_request(head, body.into()) + .send_request(head, body.into(), slf.addr) .map(move |res| { res.map_body(|head, payload| { if response_decompress { diff --git a/awc/src/ws.rs b/awc/src/ws.rs index 028330ab4..94a90535b 100644 --- a/awc/src/ws.rs +++ b/awc/src/ws.rs @@ -1,5 +1,6 @@ //! Websockets client use std::fmt::Write as FmtWrite; +use std::net::SocketAddr; use std::rc::Rc; use std::{fmt, str}; @@ -29,6 +30,7 @@ pub struct WebsocketsRequest { err: Option, origin: Option, protocols: Option, + addr: Option, max_size: usize, server_mode: bool, cookies: Option, @@ -55,6 +57,7 @@ impl WebsocketsRequest { head, err, config, + addr: None, origin: None, protocols: None, max_size: 65_536, @@ -63,6 +66,15 @@ impl WebsocketsRequest { } } + /// Set socket address of the server. + /// + /// This address is used for connection. If address is not + /// provided url's host name get resolved. + pub fn address(mut self, addr: SocketAddr) -> Self { + self.addr = Some(addr); + self + } + /// Set supported websocket protocols pub fn protocols(mut self, protos: U) -> Self where @@ -274,7 +286,7 @@ impl WebsocketsRequest { .config .connector .borrow_mut() - .open_tunnel(head) + .open_tunnel(head, self.addr) .from_err() .and_then(move |(head, framed)| { // verify response