mirror of
https://github.com/actix/actix-extras.git
synced 2025-06-28 02:49:02 +02:00
Merge actix-http project
This commit is contained in:
132
actix-http/src/client/connection.rs
Normal file
132
actix-http/src/client/connection.rs
Normal file
@ -0,0 +1,132 @@
|
||||
use std::{fmt, time};
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite};
|
||||
use bytes::Bytes;
|
||||
use futures::Future;
|
||||
use h2::client::SendRequest;
|
||||
|
||||
use crate::body::MessageBody;
|
||||
use crate::message::{RequestHead, ResponseHead};
|
||||
use crate::payload::Payload;
|
||||
|
||||
use super::error::SendRequestError;
|
||||
use super::pool::Acquired;
|
||||
use super::{h1proto, h2proto};
|
||||
|
||||
pub(crate) enum ConnectionType<Io> {
|
||||
H1(Io),
|
||||
H2(SendRequest<Bytes>),
|
||||
}
|
||||
|
||||
pub trait Connection {
|
||||
type Future: Future<Item = (ResponseHead, Payload), Error = SendRequestError>;
|
||||
|
||||
/// Send request and body
|
||||
fn send_request<B: MessageBody + 'static>(
|
||||
self,
|
||||
head: RequestHead,
|
||||
body: B,
|
||||
) -> Self::Future;
|
||||
}
|
||||
|
||||
pub(crate) trait ConnectionLifetime: AsyncRead + AsyncWrite + 'static {
|
||||
/// Close connection
|
||||
fn close(&mut self);
|
||||
|
||||
/// Release connection to the connection pool
|
||||
fn release(&mut self);
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
/// HTTP client connection
|
||||
pub struct IoConnection<T> {
|
||||
io: Option<ConnectionType<T>>,
|
||||
created: time::Instant,
|
||||
pool: Option<Acquired<T>>,
|
||||
}
|
||||
|
||||
impl<T> fmt::Debug for IoConnection<T>
|
||||
where
|
||||
T: fmt::Debug,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self.io {
|
||||
Some(ConnectionType::H1(ref io)) => write!(f, "H1Connection({:?})", io),
|
||||
Some(ConnectionType::H2(_)) => write!(f, "H2Connection"),
|
||||
None => write!(f, "Connection(Empty)"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite + 'static> IoConnection<T> {
|
||||
pub(crate) fn new(
|
||||
io: ConnectionType<T>,
|
||||
created: time::Instant,
|
||||
pool: Option<Acquired<T>>,
|
||||
) -> Self {
|
||||
IoConnection {
|
||||
pool,
|
||||
created,
|
||||
io: Some(io),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn into_inner(self) -> (ConnectionType<T>, time::Instant) {
|
||||
(self.io.unwrap(), self.created)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Connection for IoConnection<T>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + 'static,
|
||||
{
|
||||
type Future = Box<Future<Item = (ResponseHead, Payload), Error = SendRequestError>>;
|
||||
|
||||
fn send_request<B: MessageBody + 'static>(
|
||||
mut self,
|
||||
head: RequestHead,
|
||||
body: B,
|
||||
) -> Self::Future {
|
||||
match self.io.take().unwrap() {
|
||||
ConnectionType::H1(io) => Box::new(h1proto::send_request(
|
||||
io,
|
||||
head,
|
||||
body,
|
||||
self.created,
|
||||
self.pool,
|
||||
)),
|
||||
ConnectionType::H2(io) => Box::new(h2proto::send_request(
|
||||
io,
|
||||
head,
|
||||
body,
|
||||
self.created,
|
||||
self.pool,
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) enum EitherConnection<A, B> {
|
||||
A(IoConnection<A>),
|
||||
B(IoConnection<B>),
|
||||
}
|
||||
|
||||
impl<A, B> Connection for EitherConnection<A, B>
|
||||
where
|
||||
A: AsyncRead + AsyncWrite + 'static,
|
||||
B: AsyncRead + AsyncWrite + 'static,
|
||||
{
|
||||
type Future = Box<Future<Item = (ResponseHead, Payload), Error = SendRequestError>>;
|
||||
|
||||
fn send_request<RB: MessageBody + 'static>(
|
||||
self,
|
||||
head: RequestHead,
|
||||
body: RB,
|
||||
) -> Self::Future {
|
||||
match self {
|
||||
EitherConnection::A(con) => con.send_request(head, body),
|
||||
EitherConnection::B(con) => con.send_request(head, body),
|
||||
}
|
||||
}
|
||||
}
|
442
actix-http/src/client/connector.rs
Normal file
442
actix-http/src/client/connector.rs
Normal file
@ -0,0 +1,442 @@
|
||||
use std::fmt;
|
||||
use std::marker::PhantomData;
|
||||
use std::time::Duration;
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite};
|
||||
use actix_connect::{
|
||||
default_connector, Connect as TcpConnect, Connection as TcpConnection,
|
||||
};
|
||||
use actix_service::{apply_fn, Service, ServiceExt};
|
||||
use actix_utils::timeout::{TimeoutError, TimeoutService};
|
||||
use http::Uri;
|
||||
use tokio_tcp::TcpStream;
|
||||
|
||||
use super::connection::Connection;
|
||||
use super::error::ConnectError;
|
||||
use super::pool::{ConnectionPool, Protocol};
|
||||
|
||||
#[cfg(feature = "ssl")]
|
||||
use openssl::ssl::SslConnector;
|
||||
|
||||
#[cfg(not(feature = "ssl"))]
|
||||
type SslConnector = ();
|
||||
|
||||
/// Http client connector builde instance.
|
||||
/// `Connector` type uses builder-like pattern for connector service construction.
|
||||
pub struct Connector<T, U> {
|
||||
connector: T,
|
||||
timeout: Duration,
|
||||
conn_lifetime: Duration,
|
||||
conn_keep_alive: Duration,
|
||||
disconnect_timeout: Duration,
|
||||
limit: usize,
|
||||
#[allow(dead_code)]
|
||||
ssl: SslConnector,
|
||||
_t: PhantomData<U>,
|
||||
}
|
||||
|
||||
impl Connector<(), ()> {
|
||||
pub fn new() -> Connector<
|
||||
impl Service<
|
||||
Request = TcpConnect<Uri>,
|
||||
Response = TcpConnection<Uri, TcpStream>,
|
||||
Error = actix_connect::ConnectError,
|
||||
> + Clone,
|
||||
TcpStream,
|
||||
> {
|
||||
let ssl = {
|
||||
#[cfg(feature = "ssl")]
|
||||
{
|
||||
use log::error;
|
||||
use openssl::ssl::{SslConnector, SslMethod};
|
||||
|
||||
let mut ssl = SslConnector::builder(SslMethod::tls()).unwrap();
|
||||
let _ = ssl
|
||||
.set_alpn_protos(b"\x02h2\x08http/1.1")
|
||||
.map_err(|e| error!("Can not set alpn protocol: {:?}", e));
|
||||
ssl.build()
|
||||
}
|
||||
#[cfg(not(feature = "ssl"))]
|
||||
{}
|
||||
};
|
||||
|
||||
Connector {
|
||||
ssl,
|
||||
connector: default_connector(),
|
||||
timeout: Duration::from_secs(1),
|
||||
conn_lifetime: Duration::from_secs(75),
|
||||
conn_keep_alive: Duration::from_secs(15),
|
||||
disconnect_timeout: Duration::from_millis(3000),
|
||||
limit: 100,
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> Connector<T, U> {
|
||||
/// Use custom connector.
|
||||
pub fn connector<T1, U1>(self, connector: T1) -> Connector<T1, U1>
|
||||
where
|
||||
U1: AsyncRead + AsyncWrite + fmt::Debug,
|
||||
T1: Service<
|
||||
Request = TcpConnect<Uri>,
|
||||
Response = TcpConnection<Uri, U1>,
|
||||
Error = actix_connect::ConnectError,
|
||||
> + Clone,
|
||||
{
|
||||
Connector {
|
||||
connector,
|
||||
timeout: self.timeout,
|
||||
conn_lifetime: self.conn_lifetime,
|
||||
conn_keep_alive: self.conn_keep_alive,
|
||||
disconnect_timeout: self.disconnect_timeout,
|
||||
limit: self.limit,
|
||||
ssl: self.ssl,
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> Connector<T, U>
|
||||
where
|
||||
U: AsyncRead + AsyncWrite + fmt::Debug + 'static,
|
||||
T: Service<
|
||||
Request = TcpConnect<Uri>,
|
||||
Response = TcpConnection<Uri, U>,
|
||||
Error = actix_connect::ConnectError,
|
||||
> + Clone,
|
||||
{
|
||||
/// Connection timeout, i.e. max time to connect to remote host including dns name resolution.
|
||||
/// Set to 1 second by default.
|
||||
pub fn timeout(mut self, timeout: Duration) -> Self {
|
||||
self.timeout = timeout;
|
||||
self
|
||||
}
|
||||
|
||||
#[cfg(feature = "ssl")]
|
||||
/// Use custom `SslConnector` instance.
|
||||
pub fn ssl(mut self, connector: SslConnector) -> Self {
|
||||
self.ssl = connector;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set total number of simultaneous connections per type of scheme.
|
||||
///
|
||||
/// If limit is 0, the connector has no limit.
|
||||
/// The default limit size is 100.
|
||||
pub fn limit(mut self, limit: usize) -> Self {
|
||||
self.limit = limit;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set keep-alive period for opened connection.
|
||||
///
|
||||
/// Keep-alive period is the period between connection usage. If
|
||||
/// the delay between repeated usages of the same connection
|
||||
/// exceeds this period, the connection is closed.
|
||||
/// Default keep-alive period is 15 seconds.
|
||||
pub fn conn_keep_alive(mut self, dur: Duration) -> Self {
|
||||
self.conn_keep_alive = dur;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set max lifetime period for connection.
|
||||
///
|
||||
/// Connection lifetime is max lifetime of any opened connection
|
||||
/// until it is closed regardless of keep-alive period.
|
||||
/// Default lifetime period is 75 seconds.
|
||||
pub fn conn_lifetime(mut self, dur: Duration) -> Self {
|
||||
self.conn_lifetime = dur;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set server connection disconnect timeout in milliseconds.
|
||||
///
|
||||
/// Defines a timeout for disconnect connection. If a disconnect procedure does not complete
|
||||
/// within this time, the socket get dropped. This timeout affects only secure connections.
|
||||
///
|
||||
/// To disable timeout set value to 0.
|
||||
///
|
||||
/// By default disconnect timeout is set to 3000 milliseconds.
|
||||
pub fn disconnect_timeout(mut self, dur: Duration) -> Self {
|
||||
self.disconnect_timeout = dur;
|
||||
self
|
||||
}
|
||||
|
||||
/// Finish configuration process and create connector service.
|
||||
pub fn service(
|
||||
self,
|
||||
) -> impl Service<Request = Uri, Response = impl Connection, Error = ConnectError> + Clone
|
||||
{
|
||||
#[cfg(not(feature = "ssl"))]
|
||||
{
|
||||
let connector = TimeoutService::new(
|
||||
self.timeout,
|
||||
apply_fn(self.connector, |msg: Uri, srv| srv.call(msg.into()))
|
||||
.map_err(ConnectError::from)
|
||||
.map(|stream| (stream.into_parts().0, Protocol::Http1)),
|
||||
)
|
||||
.map_err(|e| match e {
|
||||
TimeoutError::Service(e) => e,
|
||||
TimeoutError::Timeout => ConnectError::Timeout,
|
||||
});
|
||||
|
||||
connect_impl::InnerConnector {
|
||||
tcp_pool: ConnectionPool::new(
|
||||
connector,
|
||||
self.conn_lifetime,
|
||||
self.conn_keep_alive,
|
||||
None,
|
||||
self.limit,
|
||||
),
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "ssl")]
|
||||
{
|
||||
const H2: &[u8] = b"h2";
|
||||
use actix_connect::ssl::OpensslConnector;
|
||||
|
||||
let ssl_service = TimeoutService::new(
|
||||
self.timeout,
|
||||
apply_fn(self.connector.clone(), |msg: Uri, srv| srv.call(msg.into()))
|
||||
.map_err(ConnectError::from)
|
||||
.and_then(
|
||||
OpensslConnector::service(self.ssl)
|
||||
.map_err(ConnectError::from)
|
||||
.map(|stream| {
|
||||
let sock = stream.into_parts().0;
|
||||
let h2 = sock
|
||||
.get_ref()
|
||||
.ssl()
|
||||
.selected_alpn_protocol()
|
||||
.map(|protos| protos.windows(2).any(|w| w == H2))
|
||||
.unwrap_or(false);
|
||||
if h2 {
|
||||
(sock, Protocol::Http2)
|
||||
} else {
|
||||
(sock, Protocol::Http1)
|
||||
}
|
||||
}),
|
||||
),
|
||||
)
|
||||
.map_err(|e| match e {
|
||||
TimeoutError::Service(e) => e,
|
||||
TimeoutError::Timeout => ConnectError::Timeout,
|
||||
});
|
||||
|
||||
let tcp_service = TimeoutService::new(
|
||||
self.timeout,
|
||||
apply_fn(self.connector.clone(), |msg: Uri, srv| srv.call(msg.into()))
|
||||
.map_err(ConnectError::from)
|
||||
.map(|stream| (stream.into_parts().0, Protocol::Http1)),
|
||||
)
|
||||
.map_err(|e| match e {
|
||||
TimeoutError::Service(e) => e,
|
||||
TimeoutError::Timeout => ConnectError::Timeout,
|
||||
});
|
||||
|
||||
connect_impl::InnerConnector {
|
||||
tcp_pool: ConnectionPool::new(
|
||||
tcp_service,
|
||||
self.conn_lifetime,
|
||||
self.conn_keep_alive,
|
||||
None,
|
||||
self.limit,
|
||||
),
|
||||
ssl_pool: ConnectionPool::new(
|
||||
ssl_service,
|
||||
self.conn_lifetime,
|
||||
self.conn_keep_alive,
|
||||
Some(self.disconnect_timeout),
|
||||
self.limit,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "ssl"))]
|
||||
mod connect_impl {
|
||||
use futures::future::{err, Either, FutureResult};
|
||||
use futures::Poll;
|
||||
|
||||
use super::*;
|
||||
use crate::client::connection::IoConnection;
|
||||
|
||||
pub(crate) struct InnerConnector<T, Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + 'static,
|
||||
T: Service<Request = Uri, Response = (Io, Protocol), Error = ConnectError>,
|
||||
{
|
||||
pub(crate) tcp_pool: ConnectionPool<T, Io>,
|
||||
}
|
||||
|
||||
impl<T, Io> Clone for InnerConnector<T, Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + 'static,
|
||||
T: Service<Request = Uri, Response = (Io, Protocol), Error = ConnectError>
|
||||
+ Clone,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
InnerConnector {
|
||||
tcp_pool: self.tcp_pool.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, Io> Service for InnerConnector<T, Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + 'static,
|
||||
T: Service<Request = Uri, Response = (Io, Protocol), Error = ConnectError>,
|
||||
{
|
||||
type Request = Uri;
|
||||
type Response = IoConnection<Io>;
|
||||
type Error = ConnectError;
|
||||
type Future = Either<
|
||||
<ConnectionPool<T, Io> as Service>::Future,
|
||||
FutureResult<IoConnection<Io>, ConnectError>,
|
||||
>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
self.tcp_pool.poll_ready()
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Uri) -> Self::Future {
|
||||
match req.scheme_str() {
|
||||
Some("https") | Some("wss") => {
|
||||
Either::B(err(ConnectError::SslIsNotSupported))
|
||||
}
|
||||
_ => Either::A(self.tcp_pool.call(req)),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "ssl")]
|
||||
mod connect_impl {
|
||||
use std::marker::PhantomData;
|
||||
|
||||
use futures::future::{Either, FutureResult};
|
||||
use futures::{Async, Future, Poll};
|
||||
|
||||
use super::*;
|
||||
use crate::client::connection::EitherConnection;
|
||||
|
||||
pub(crate) struct InnerConnector<T1, T2, Io1, Io2>
|
||||
where
|
||||
Io1: AsyncRead + AsyncWrite + 'static,
|
||||
Io2: AsyncRead + AsyncWrite + 'static,
|
||||
T1: Service<Request = Uri, Response = (Io1, Protocol), Error = ConnectError>,
|
||||
T2: Service<Request = Uri, Response = (Io2, Protocol), Error = ConnectError>,
|
||||
{
|
||||
pub(crate) tcp_pool: ConnectionPool<T1, Io1>,
|
||||
pub(crate) ssl_pool: ConnectionPool<T2, Io2>,
|
||||
}
|
||||
|
||||
impl<T1, T2, Io1, Io2> Clone for InnerConnector<T1, T2, Io1, Io2>
|
||||
where
|
||||
Io1: AsyncRead + AsyncWrite + 'static,
|
||||
Io2: AsyncRead + AsyncWrite + 'static,
|
||||
T1: Service<Request = Uri, Response = (Io1, Protocol), Error = ConnectError>
|
||||
+ Clone,
|
||||
T2: Service<Request = Uri, Response = (Io2, Protocol), Error = ConnectError>
|
||||
+ Clone,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
InnerConnector {
|
||||
tcp_pool: self.tcp_pool.clone(),
|
||||
ssl_pool: self.ssl_pool.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T1, T2, Io1, Io2> Service for InnerConnector<T1, T2, Io1, Io2>
|
||||
where
|
||||
Io1: AsyncRead + AsyncWrite + 'static,
|
||||
Io2: AsyncRead + AsyncWrite + 'static,
|
||||
T1: Service<Request = Uri, Response = (Io1, Protocol), Error = ConnectError>,
|
||||
T2: Service<Request = Uri, Response = (Io2, Protocol), Error = ConnectError>,
|
||||
{
|
||||
type Request = Uri;
|
||||
type Response = EitherConnection<Io1, Io2>;
|
||||
type Error = ConnectError;
|
||||
type Future = Either<
|
||||
FutureResult<Self::Response, Self::Error>,
|
||||
Either<
|
||||
InnerConnectorResponseA<T1, Io1, Io2>,
|
||||
InnerConnectorResponseB<T2, Io1, Io2>,
|
||||
>,
|
||||
>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
self.tcp_pool.poll_ready()
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Uri) -> Self::Future {
|
||||
match req.scheme_str() {
|
||||
Some("https") | Some("wss") => {
|
||||
Either::B(Either::B(InnerConnectorResponseB {
|
||||
fut: self.ssl_pool.call(req),
|
||||
_t: PhantomData,
|
||||
}))
|
||||
}
|
||||
_ => Either::B(Either::A(InnerConnectorResponseA {
|
||||
fut: self.tcp_pool.call(req),
|
||||
_t: PhantomData,
|
||||
})),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct InnerConnectorResponseA<T, Io1, Io2>
|
||||
where
|
||||
Io1: AsyncRead + AsyncWrite + 'static,
|
||||
T: Service<Request = Uri, Response = (Io1, Protocol), Error = ConnectError>,
|
||||
{
|
||||
fut: <ConnectionPool<T, Io1> as Service>::Future,
|
||||
_t: PhantomData<Io2>,
|
||||
}
|
||||
|
||||
impl<T, Io1, Io2> Future for InnerConnectorResponseA<T, Io1, Io2>
|
||||
where
|
||||
T: Service<Request = Uri, Response = (Io1, Protocol), Error = ConnectError>,
|
||||
Io1: AsyncRead + AsyncWrite + 'static,
|
||||
Io2: AsyncRead + AsyncWrite + 'static,
|
||||
{
|
||||
type Item = EitherConnection<Io1, Io2>;
|
||||
type Error = ConnectError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
match self.fut.poll()? {
|
||||
Async::NotReady => Ok(Async::NotReady),
|
||||
Async::Ready(res) => Ok(Async::Ready(EitherConnection::A(res))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct InnerConnectorResponseB<T, Io1, Io2>
|
||||
where
|
||||
Io2: AsyncRead + AsyncWrite + 'static,
|
||||
T: Service<Request = Uri, Response = (Io2, Protocol), Error = ConnectError>,
|
||||
{
|
||||
fut: <ConnectionPool<T, Io2> as Service>::Future,
|
||||
_t: PhantomData<Io1>,
|
||||
}
|
||||
|
||||
impl<T, Io1, Io2> Future for InnerConnectorResponseB<T, Io1, Io2>
|
||||
where
|
||||
T: Service<Request = Uri, Response = (Io2, Protocol), Error = ConnectError>,
|
||||
Io1: AsyncRead + AsyncWrite + 'static,
|
||||
Io2: AsyncRead + AsyncWrite + 'static,
|
||||
{
|
||||
type Item = EitherConnection<Io1, Io2>;
|
||||
type Error = ConnectError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
match self.fut.poll()? {
|
||||
Async::NotReady => Ok(Async::NotReady),
|
||||
Async::Ready(res) => Ok(Async::Ready(EitherConnection::B(res))),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
124
actix-http/src/client/error.rs
Normal file
124
actix-http/src/client/error.rs
Normal file
@ -0,0 +1,124 @@
|
||||
use std::io;
|
||||
|
||||
use derive_more::{Display, From};
|
||||
use trust_dns_resolver::error::ResolveError;
|
||||
|
||||
#[cfg(feature = "ssl")]
|
||||
use openssl::ssl::{Error as SslError, HandshakeError};
|
||||
|
||||
use crate::error::{Error, ParseError, ResponseError};
|
||||
use crate::http::Error as HttpError;
|
||||
use crate::response::Response;
|
||||
|
||||
/// A set of errors that can occur while connecting to an HTTP host
|
||||
#[derive(Debug, Display, From)]
|
||||
pub enum ConnectError {
|
||||
/// SSL feature is not enabled
|
||||
#[display(fmt = "SSL is not supported")]
|
||||
SslIsNotSupported,
|
||||
|
||||
/// SSL error
|
||||
#[cfg(feature = "ssl")]
|
||||
#[display(fmt = "{}", _0)]
|
||||
SslError(SslError),
|
||||
|
||||
/// Failed to resolve the hostname
|
||||
#[display(fmt = "Failed resolving hostname: {}", _0)]
|
||||
Resolver(ResolveError),
|
||||
|
||||
/// No dns records
|
||||
#[display(fmt = "No dns records found for the input")]
|
||||
NoRecords,
|
||||
|
||||
/// Http2 error
|
||||
#[display(fmt = "{}", _0)]
|
||||
H2(h2::Error),
|
||||
|
||||
/// Connecting took too long
|
||||
#[display(fmt = "Timeout out while establishing connection")]
|
||||
Timeout,
|
||||
|
||||
/// Connector has been disconnected
|
||||
#[display(fmt = "Internal error: connector has been disconnected")]
|
||||
Disconnected,
|
||||
|
||||
/// Unresolved host name
|
||||
#[display(fmt = "Connector received `Connect` method with unresolved host")]
|
||||
Unresolverd,
|
||||
|
||||
/// Connection io error
|
||||
#[display(fmt = "{}", _0)]
|
||||
Io(io::Error),
|
||||
}
|
||||
|
||||
impl From<actix_connect::ConnectError> for ConnectError {
|
||||
fn from(err: actix_connect::ConnectError) -> ConnectError {
|
||||
match err {
|
||||
actix_connect::ConnectError::Resolver(e) => ConnectError::Resolver(e),
|
||||
actix_connect::ConnectError::NoRecords => ConnectError::NoRecords,
|
||||
actix_connect::ConnectError::InvalidInput => panic!(),
|
||||
actix_connect::ConnectError::Unresolverd => ConnectError::Unresolverd,
|
||||
actix_connect::ConnectError::Io(e) => ConnectError::Io(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "ssl")]
|
||||
impl<T> From<HandshakeError<T>> for ConnectError {
|
||||
fn from(err: HandshakeError<T>) -> ConnectError {
|
||||
match err {
|
||||
HandshakeError::SetupFailure(stack) => SslError::from(stack).into(),
|
||||
HandshakeError::Failure(stream) => stream.into_error().into(),
|
||||
HandshakeError::WouldBlock(stream) => stream.into_error().into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Display, From)]
|
||||
pub enum InvalidUrl {
|
||||
#[display(fmt = "Missing url scheme")]
|
||||
MissingScheme,
|
||||
#[display(fmt = "Unknown url scheme")]
|
||||
UnknownScheme,
|
||||
#[display(fmt = "Missing host name")]
|
||||
MissingHost,
|
||||
#[display(fmt = "Url parse error: {}", _0)]
|
||||
HttpError(http::Error),
|
||||
}
|
||||
|
||||
/// A set of errors that can occur during request sending and response reading
|
||||
#[derive(Debug, Display, From)]
|
||||
pub enum SendRequestError {
|
||||
/// Invalid URL
|
||||
#[display(fmt = "Invalid URL: {}", _0)]
|
||||
Url(InvalidUrl),
|
||||
/// Failed to connect to host
|
||||
#[display(fmt = "Failed to connect to host: {}", _0)]
|
||||
Connect(ConnectError),
|
||||
/// Error sending request
|
||||
Send(io::Error),
|
||||
/// Error parsing response
|
||||
Response(ParseError),
|
||||
/// Http error
|
||||
#[display(fmt = "{}", _0)]
|
||||
Http(HttpError),
|
||||
/// Http2 error
|
||||
#[display(fmt = "{}", _0)]
|
||||
H2(h2::Error),
|
||||
/// Error sending request body
|
||||
Body(Error),
|
||||
}
|
||||
|
||||
/// Convert `SendRequestError` to a server `Response`
|
||||
impl ResponseError for SendRequestError {
|
||||
fn error_response(&self) -> Response {
|
||||
match *self {
|
||||
SendRequestError::Connect(ConnectError::Timeout) => {
|
||||
Response::GatewayTimeout()
|
||||
}
|
||||
SendRequestError::Connect(_) => Response::BadGateway(),
|
||||
_ => Response::InternalServerError(),
|
||||
}
|
||||
.into()
|
||||
}
|
||||
}
|
248
actix-http/src/client/h1proto.rs
Normal file
248
actix-http/src/client/h1proto.rs
Normal file
@ -0,0 +1,248 @@
|
||||
use std::{io, time};
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite, Framed};
|
||||
use bytes::Bytes;
|
||||
use futures::future::{ok, Either};
|
||||
use futures::{Async, Future, Poll, Sink, Stream};
|
||||
|
||||
use crate::error::PayloadError;
|
||||
use crate::h1;
|
||||
use crate::message::{RequestHead, ResponseHead};
|
||||
use crate::payload::{Payload, PayloadStream};
|
||||
|
||||
use super::connection::{ConnectionLifetime, ConnectionType, IoConnection};
|
||||
use super::error::{ConnectError, SendRequestError};
|
||||
use super::pool::Acquired;
|
||||
use crate::body::{BodyLength, MessageBody};
|
||||
|
||||
pub(crate) fn send_request<T, B>(
|
||||
io: T,
|
||||
head: RequestHead,
|
||||
body: B,
|
||||
created: time::Instant,
|
||||
pool: Option<Acquired<T>>,
|
||||
) -> impl Future<Item = (ResponseHead, Payload), Error = SendRequestError>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + 'static,
|
||||
B: MessageBody,
|
||||
{
|
||||
let io = H1Connection {
|
||||
created,
|
||||
pool,
|
||||
io: Some(io),
|
||||
};
|
||||
|
||||
let len = body.length();
|
||||
|
||||
// create Framed and send reqest
|
||||
Framed::new(io, h1::ClientCodec::default())
|
||||
.send((head, len).into())
|
||||
.from_err()
|
||||
// send request body
|
||||
.and_then(move |framed| match body.length() {
|
||||
BodyLength::None | BodyLength::Empty | BodyLength::Sized(0) => {
|
||||
Either::A(ok(framed))
|
||||
}
|
||||
_ => Either::B(SendBody::new(body, framed)),
|
||||
})
|
||||
// read response and init read body
|
||||
.and_then(|framed| {
|
||||
framed
|
||||
.into_future()
|
||||
.map_err(|(e, _)| SendRequestError::from(e))
|
||||
.and_then(|(item, framed)| {
|
||||
if let Some(res) = item {
|
||||
match framed.get_codec().message_type() {
|
||||
h1::MessageType::None => {
|
||||
let force_close = !framed.get_codec().keepalive();
|
||||
release_connection(framed, force_close);
|
||||
Ok((res, Payload::None))
|
||||
}
|
||||
_ => {
|
||||
let pl: PayloadStream = Box::new(PlStream::new(framed));
|
||||
Ok((res, pl.into()))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Err(ConnectError::Disconnected.into())
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
/// HTTP client connection
|
||||
pub struct H1Connection<T> {
|
||||
io: Option<T>,
|
||||
created: time::Instant,
|
||||
pool: Option<Acquired<T>>,
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite + 'static> ConnectionLifetime for H1Connection<T> {
|
||||
/// Close connection
|
||||
fn close(&mut self) {
|
||||
if let Some(mut pool) = self.pool.take() {
|
||||
if let Some(io) = self.io.take() {
|
||||
pool.close(IoConnection::new(
|
||||
ConnectionType::H1(io),
|
||||
self.created,
|
||||
None,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Release this connection to the connection pool
|
||||
fn release(&mut self) {
|
||||
if let Some(mut pool) = self.pool.take() {
|
||||
if let Some(io) = self.io.take() {
|
||||
pool.release(IoConnection::new(
|
||||
ConnectionType::H1(io),
|
||||
self.created,
|
||||
None,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite + 'static> io::Read for H1Connection<T> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
self.io.as_mut().unwrap().read(buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite + 'static> AsyncRead for H1Connection<T> {}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite + 'static> io::Write for H1Connection<T> {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.io.as_mut().unwrap().write(buf)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
self.io.as_mut().unwrap().flush()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite + 'static> AsyncWrite for H1Connection<T> {
|
||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
||||
self.io.as_mut().unwrap().shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
/// Future responsible for sending request body to the peer
|
||||
pub(crate) struct SendBody<I, B> {
|
||||
body: Option<B>,
|
||||
framed: Option<Framed<I, h1::ClientCodec>>,
|
||||
flushed: bool,
|
||||
}
|
||||
|
||||
impl<I, B> SendBody<I, B>
|
||||
where
|
||||
I: AsyncRead + AsyncWrite + 'static,
|
||||
B: MessageBody,
|
||||
{
|
||||
pub(crate) fn new(body: B, framed: Framed<I, h1::ClientCodec>) -> Self {
|
||||
SendBody {
|
||||
body: Some(body),
|
||||
framed: Some(framed),
|
||||
flushed: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, B> Future for SendBody<I, B>
|
||||
where
|
||||
I: ConnectionLifetime,
|
||||
B: MessageBody,
|
||||
{
|
||||
type Item = Framed<I, h1::ClientCodec>;
|
||||
type Error = SendRequestError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
let mut body_ready = true;
|
||||
loop {
|
||||
while body_ready
|
||||
&& self.body.is_some()
|
||||
&& !self.framed.as_ref().unwrap().is_write_buf_full()
|
||||
{
|
||||
match self.body.as_mut().unwrap().poll_next()? {
|
||||
Async::Ready(item) => {
|
||||
// check if body is done
|
||||
if item.is_none() {
|
||||
let _ = self.body.take();
|
||||
}
|
||||
self.flushed = false;
|
||||
self.framed
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.force_send(h1::Message::Chunk(item))?;
|
||||
break;
|
||||
}
|
||||
Async::NotReady => body_ready = false,
|
||||
}
|
||||
}
|
||||
|
||||
if !self.flushed {
|
||||
match self.framed.as_mut().unwrap().poll_complete()? {
|
||||
Async::Ready(_) => {
|
||||
self.flushed = true;
|
||||
continue;
|
||||
}
|
||||
Async::NotReady => return Ok(Async::NotReady),
|
||||
}
|
||||
}
|
||||
|
||||
if self.body.is_none() {
|
||||
return Ok(Async::Ready(self.framed.take().unwrap()));
|
||||
}
|
||||
return Ok(Async::NotReady);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct PlStream<Io> {
|
||||
framed: Option<Framed<Io, h1::ClientPayloadCodec>>,
|
||||
}
|
||||
|
||||
impl<Io: ConnectionLifetime> PlStream<Io> {
|
||||
fn new(framed: Framed<Io, h1::ClientCodec>) -> Self {
|
||||
PlStream {
|
||||
framed: Some(framed.map_codec(|codec| codec.into_payload_codec())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Io: ConnectionLifetime> Stream for PlStream<Io> {
|
||||
type Item = Bytes;
|
||||
type Error = PayloadError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
match self.framed.as_mut().unwrap().poll()? {
|
||||
Async::NotReady => Ok(Async::NotReady),
|
||||
Async::Ready(Some(chunk)) => {
|
||||
if let Some(chunk) = chunk {
|
||||
Ok(Async::Ready(Some(chunk)))
|
||||
} else {
|
||||
let framed = self.framed.take().unwrap();
|
||||
let force_close = framed.get_codec().keepalive();
|
||||
release_connection(framed, force_close);
|
||||
Ok(Async::Ready(None))
|
||||
}
|
||||
}
|
||||
Async::Ready(None) => Ok(Async::Ready(None)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn release_connection<T, U>(framed: Framed<T, U>, force_close: bool)
|
||||
where
|
||||
T: ConnectionLifetime,
|
||||
{
|
||||
let mut parts = framed.into_parts();
|
||||
if !force_close && parts.read_buf.is_empty() && parts.write_buf.is_empty() {
|
||||
parts.io.release()
|
||||
} else {
|
||||
parts.io.close()
|
||||
}
|
||||
}
|
185
actix-http/src/client/h2proto.rs
Normal file
185
actix-http/src/client/h2proto.rs
Normal file
@ -0,0 +1,185 @@
|
||||
use std::time;
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite};
|
||||
use bytes::Bytes;
|
||||
use futures::future::{err, Either};
|
||||
use futures::{Async, Future, Poll};
|
||||
use h2::{client::SendRequest, SendStream};
|
||||
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING};
|
||||
use http::{request::Request, HttpTryFrom, Method, Version};
|
||||
|
||||
use crate::body::{BodyLength, MessageBody};
|
||||
use crate::message::{RequestHead, ResponseHead};
|
||||
use crate::payload::Payload;
|
||||
|
||||
use super::connection::{ConnectionType, IoConnection};
|
||||
use super::error::SendRequestError;
|
||||
use super::pool::Acquired;
|
||||
|
||||
pub(crate) fn send_request<T, B>(
|
||||
io: SendRequest<Bytes>,
|
||||
head: RequestHead,
|
||||
body: B,
|
||||
created: time::Instant,
|
||||
pool: Option<Acquired<T>>,
|
||||
) -> impl Future<Item = (ResponseHead, Payload), Error = SendRequestError>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + 'static,
|
||||
B: MessageBody,
|
||||
{
|
||||
trace!("Sending client request: {:?} {:?}", head, body.length());
|
||||
let head_req = head.method == Method::HEAD;
|
||||
let length = body.length();
|
||||
let eof = match length {
|
||||
BodyLength::None | BodyLength::Empty | BodyLength::Sized(0) => true,
|
||||
_ => false,
|
||||
};
|
||||
|
||||
io.ready()
|
||||
.map_err(SendRequestError::from)
|
||||
.and_then(move |mut io| {
|
||||
let mut req = Request::new(());
|
||||
*req.uri_mut() = head.uri;
|
||||
*req.method_mut() = head.method;
|
||||
*req.version_mut() = Version::HTTP_2;
|
||||
|
||||
let mut skip_len = true;
|
||||
// let mut has_date = false;
|
||||
|
||||
// Content length
|
||||
let _ = match length {
|
||||
BodyLength::None => None,
|
||||
BodyLength::Stream => {
|
||||
skip_len = false;
|
||||
None
|
||||
}
|
||||
BodyLength::Empty => req
|
||||
.headers_mut()
|
||||
.insert(CONTENT_LENGTH, HeaderValue::from_static("0")),
|
||||
BodyLength::Sized(len) => req.headers_mut().insert(
|
||||
CONTENT_LENGTH,
|
||||
HeaderValue::try_from(format!("{}", len)).unwrap(),
|
||||
),
|
||||
BodyLength::Sized64(len) => req.headers_mut().insert(
|
||||
CONTENT_LENGTH,
|
||||
HeaderValue::try_from(format!("{}", len)).unwrap(),
|
||||
),
|
||||
};
|
||||
|
||||
// copy headers
|
||||
for (key, value) in head.headers.iter() {
|
||||
match *key {
|
||||
CONNECTION | TRANSFER_ENCODING => continue, // http2 specific
|
||||
CONTENT_LENGTH if skip_len => continue,
|
||||
// DATE => has_date = true,
|
||||
_ => (),
|
||||
}
|
||||
req.headers_mut().append(key, value.clone());
|
||||
}
|
||||
|
||||
match io.send_request(req, eof) {
|
||||
Ok((res, send)) => {
|
||||
release(io, pool, created, false);
|
||||
|
||||
if !eof {
|
||||
Either::A(Either::B(
|
||||
SendBody {
|
||||
body,
|
||||
send,
|
||||
buf: None,
|
||||
}
|
||||
.and_then(move |_| res.map_err(SendRequestError::from)),
|
||||
))
|
||||
} else {
|
||||
Either::B(res.map_err(SendRequestError::from))
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
release(io, pool, created, e.is_io());
|
||||
Either::A(Either::A(err(e.into())))
|
||||
}
|
||||
}
|
||||
})
|
||||
.and_then(move |resp| {
|
||||
let (parts, body) = resp.into_parts();
|
||||
let payload = if head_req { Payload::None } else { body.into() };
|
||||
|
||||
let mut head = ResponseHead::default();
|
||||
head.version = parts.version;
|
||||
head.status = parts.status;
|
||||
head.headers = parts.headers;
|
||||
|
||||
Ok((head, payload))
|
||||
})
|
||||
.from_err()
|
||||
}
|
||||
|
||||
struct SendBody<B: MessageBody> {
|
||||
body: B,
|
||||
send: SendStream<Bytes>,
|
||||
buf: Option<Bytes>,
|
||||
}
|
||||
|
||||
impl<B: MessageBody> Future for SendBody<B> {
|
||||
type Item = ();
|
||||
type Error = SendRequestError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
loop {
|
||||
if self.buf.is_none() {
|
||||
match self.body.poll_next() {
|
||||
Ok(Async::Ready(Some(buf))) => {
|
||||
self.send.reserve_capacity(buf.len());
|
||||
self.buf = Some(buf);
|
||||
}
|
||||
Ok(Async::Ready(None)) => {
|
||||
if let Err(e) = self.send.send_data(Bytes::new(), true) {
|
||||
return Err(e.into());
|
||||
}
|
||||
self.send.reserve_capacity(0);
|
||||
return Ok(Async::Ready(()));
|
||||
}
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
match self.send.poll_capacity() {
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
|
||||
Ok(Async::Ready(Some(cap))) => {
|
||||
let mut buf = self.buf.take().unwrap();
|
||||
let len = buf.len();
|
||||
let bytes = buf.split_to(std::cmp::min(cap, len));
|
||||
|
||||
if let Err(e) = self.send.send_data(bytes, false) {
|
||||
return Err(e.into());
|
||||
} else {
|
||||
if !buf.is_empty() {
|
||||
self.send.reserve_capacity(buf.len());
|
||||
self.buf = Some(buf);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// release SendRequest object
|
||||
fn release<T: AsyncRead + AsyncWrite + 'static>(
|
||||
io: SendRequest<Bytes>,
|
||||
pool: Option<Acquired<T>>,
|
||||
created: time::Instant,
|
||||
close: bool,
|
||||
) {
|
||||
if let Some(mut pool) = pool {
|
||||
if close {
|
||||
pool.close(IoConnection::new(ConnectionType::H2(io), created, None));
|
||||
} else {
|
||||
pool.release(IoConnection::new(ConnectionType::H2(io), created, None));
|
||||
}
|
||||
}
|
||||
}
|
11
actix-http/src/client/mod.rs
Normal file
11
actix-http/src/client/mod.rs
Normal file
@ -0,0 +1,11 @@
|
||||
//! Http client api
|
||||
mod connection;
|
||||
mod connector;
|
||||
mod error;
|
||||
mod h1proto;
|
||||
mod h2proto;
|
||||
mod pool;
|
||||
|
||||
pub use self::connection::Connection;
|
||||
pub use self::connector::Connector;
|
||||
pub use self::error::{ConnectError, InvalidUrl, SendRequestError};
|
538
actix-http/src/client/pool.rs
Normal file
538
actix-http/src/client/pool.rs
Normal file
@ -0,0 +1,538 @@
|
||||
use std::cell::RefCell;
|
||||
use std::collections::VecDeque;
|
||||
use std::io;
|
||||
use std::rc::Rc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite};
|
||||
use actix_service::Service;
|
||||
use bytes::Bytes;
|
||||
use futures::future::{err, ok, Either, FutureResult};
|
||||
use futures::task::AtomicTask;
|
||||
use futures::unsync::oneshot;
|
||||
use futures::{Async, Future, Poll};
|
||||
use h2::client::{handshake, Handshake};
|
||||
use hashbrown::HashMap;
|
||||
use http::uri::{Authority, Uri};
|
||||
use indexmap::IndexSet;
|
||||
use slab::Slab;
|
||||
use tokio_timer::{sleep, Delay};
|
||||
|
||||
use super::connection::{ConnectionType, IoConnection};
|
||||
use super::error::ConnectError;
|
||||
|
||||
#[derive(Clone, Copy, PartialEq)]
|
||||
pub enum Protocol {
|
||||
Http1,
|
||||
Http2,
|
||||
}
|
||||
|
||||
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
|
||||
pub(crate) struct Key {
|
||||
authority: Authority,
|
||||
}
|
||||
|
||||
impl From<Authority> for Key {
|
||||
fn from(authority: Authority) -> Key {
|
||||
Key { authority }
|
||||
}
|
||||
}
|
||||
|
||||
/// Connections pool
|
||||
pub(crate) struct ConnectionPool<T, Io: AsyncRead + AsyncWrite + 'static>(
|
||||
T,
|
||||
Rc<RefCell<Inner<Io>>>,
|
||||
);
|
||||
|
||||
impl<T, Io> ConnectionPool<T, Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + 'static,
|
||||
T: Service<Request = Uri, Response = (Io, Protocol), Error = ConnectError>,
|
||||
{
|
||||
pub(crate) fn new(
|
||||
connector: T,
|
||||
conn_lifetime: Duration,
|
||||
conn_keep_alive: Duration,
|
||||
disconnect_timeout: Option<Duration>,
|
||||
limit: usize,
|
||||
) -> Self {
|
||||
ConnectionPool(
|
||||
connector,
|
||||
Rc::new(RefCell::new(Inner {
|
||||
conn_lifetime,
|
||||
conn_keep_alive,
|
||||
disconnect_timeout,
|
||||
limit,
|
||||
acquired: 0,
|
||||
waiters: Slab::new(),
|
||||
waiters_queue: IndexSet::new(),
|
||||
available: HashMap::new(),
|
||||
task: AtomicTask::new(),
|
||||
})),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, Io> Clone for ConnectionPool<T, Io>
|
||||
where
|
||||
T: Clone,
|
||||
Io: AsyncRead + AsyncWrite + 'static,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
ConnectionPool(self.0.clone(), self.1.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, Io> Service for ConnectionPool<T, Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + 'static,
|
||||
T: Service<Request = Uri, Response = (Io, Protocol), Error = ConnectError>,
|
||||
{
|
||||
type Request = Uri;
|
||||
type Response = IoConnection<Io>;
|
||||
type Error = ConnectError;
|
||||
type Future = Either<
|
||||
FutureResult<Self::Response, Self::Error>,
|
||||
Either<WaitForConnection<Io>, OpenConnection<T::Future, Io>>,
|
||||
>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
self.0.poll_ready()
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Uri) -> Self::Future {
|
||||
let key = if let Some(authority) = req.authority_part() {
|
||||
authority.clone().into()
|
||||
} else {
|
||||
return Either::A(err(ConnectError::Unresolverd));
|
||||
};
|
||||
|
||||
// acquire connection
|
||||
match self.1.as_ref().borrow_mut().acquire(&key) {
|
||||
Acquire::Acquired(io, created) => {
|
||||
// use existing connection
|
||||
Either::A(ok(IoConnection::new(
|
||||
io,
|
||||
created,
|
||||
Some(Acquired(key, Some(self.1.clone()))),
|
||||
)))
|
||||
}
|
||||
Acquire::NotAvailable => {
|
||||
// connection is not available, wait
|
||||
let (rx, token) = self.1.as_ref().borrow_mut().wait_for(req);
|
||||
Either::B(Either::A(WaitForConnection {
|
||||
rx,
|
||||
key,
|
||||
token,
|
||||
inner: Some(self.1.clone()),
|
||||
}))
|
||||
}
|
||||
Acquire::Available => {
|
||||
// open new connection
|
||||
Either::B(Either::B(OpenConnection::new(
|
||||
key,
|
||||
self.1.clone(),
|
||||
self.0.call(req),
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub struct WaitForConnection<Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + 'static,
|
||||
{
|
||||
key: Key,
|
||||
token: usize,
|
||||
rx: oneshot::Receiver<Result<IoConnection<Io>, ConnectError>>,
|
||||
inner: Option<Rc<RefCell<Inner<Io>>>>,
|
||||
}
|
||||
|
||||
impl<Io> Drop for WaitForConnection<Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + 'static,
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
if let Some(i) = self.inner.take() {
|
||||
let mut inner = i.as_ref().borrow_mut();
|
||||
inner.release_waiter(&self.key, self.token);
|
||||
inner.check_availibility();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Io> Future for WaitForConnection<Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite,
|
||||
{
|
||||
type Item = IoConnection<Io>;
|
||||
type Error = ConnectError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
match self.rx.poll() {
|
||||
Ok(Async::Ready(item)) => match item {
|
||||
Err(err) => Err(err),
|
||||
Ok(conn) => {
|
||||
let _ = self.inner.take();
|
||||
Ok(Async::Ready(conn))
|
||||
}
|
||||
},
|
||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||
Err(_) => {
|
||||
let _ = self.inner.take();
|
||||
Err(ConnectError::Disconnected)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub struct OpenConnection<F, Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + 'static,
|
||||
{
|
||||
fut: F,
|
||||
key: Key,
|
||||
h2: Option<Handshake<Io, Bytes>>,
|
||||
inner: Option<Rc<RefCell<Inner<Io>>>>,
|
||||
}
|
||||
|
||||
impl<F, Io> OpenConnection<F, Io>
|
||||
where
|
||||
F: Future<Item = (Io, Protocol), Error = ConnectError>,
|
||||
Io: AsyncRead + AsyncWrite + 'static,
|
||||
{
|
||||
fn new(key: Key, inner: Rc<RefCell<Inner<Io>>>, fut: F) -> Self {
|
||||
OpenConnection {
|
||||
key,
|
||||
fut,
|
||||
inner: Some(inner),
|
||||
h2: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, Io> Drop for OpenConnection<F, Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + 'static,
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.inner.take() {
|
||||
let mut inner = inner.as_ref().borrow_mut();
|
||||
inner.release();
|
||||
inner.check_availibility();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, Io> Future for OpenConnection<F, Io>
|
||||
where
|
||||
F: Future<Item = (Io, Protocol), Error = ConnectError>,
|
||||
Io: AsyncRead + AsyncWrite,
|
||||
{
|
||||
type Item = IoConnection<Io>;
|
||||
type Error = ConnectError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
if let Some(ref mut h2) = self.h2 {
|
||||
return match h2.poll() {
|
||||
Ok(Async::Ready((snd, connection))) => {
|
||||
tokio_current_thread::spawn(connection.map_err(|_| ()));
|
||||
Ok(Async::Ready(IoConnection::new(
|
||||
ConnectionType::H2(snd),
|
||||
Instant::now(),
|
||||
Some(Acquired(self.key.clone(), self.inner.clone())),
|
||||
)))
|
||||
}
|
||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||
Err(e) => Err(e.into()),
|
||||
};
|
||||
}
|
||||
|
||||
match self.fut.poll() {
|
||||
Err(err) => Err(err),
|
||||
Ok(Async::Ready((io, proto))) => {
|
||||
let _ = self.inner.take();
|
||||
if proto == Protocol::Http1 {
|
||||
Ok(Async::Ready(IoConnection::new(
|
||||
ConnectionType::H1(io),
|
||||
Instant::now(),
|
||||
Some(Acquired(self.key.clone(), self.inner.clone())),
|
||||
)))
|
||||
} else {
|
||||
self.h2 = Some(handshake(io));
|
||||
self.poll()
|
||||
}
|
||||
}
|
||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum Acquire<T> {
|
||||
Acquired(ConnectionType<T>, Instant),
|
||||
Available,
|
||||
NotAvailable,
|
||||
}
|
||||
|
||||
// #[derive(Debug)]
|
||||
struct AvailableConnection<Io> {
|
||||
io: ConnectionType<Io>,
|
||||
used: Instant,
|
||||
created: Instant,
|
||||
}
|
||||
|
||||
pub(crate) struct Inner<Io> {
|
||||
conn_lifetime: Duration,
|
||||
conn_keep_alive: Duration,
|
||||
disconnect_timeout: Option<Duration>,
|
||||
limit: usize,
|
||||
acquired: usize,
|
||||
available: HashMap<Key, VecDeque<AvailableConnection<Io>>>,
|
||||
waiters: Slab<(Uri, oneshot::Sender<Result<IoConnection<Io>, ConnectError>>)>,
|
||||
waiters_queue: IndexSet<(Key, usize)>,
|
||||
task: AtomicTask,
|
||||
}
|
||||
|
||||
impl<Io> Inner<Io> {
|
||||
fn reserve(&mut self) {
|
||||
self.acquired += 1;
|
||||
}
|
||||
|
||||
fn release(&mut self) {
|
||||
self.acquired -= 1;
|
||||
}
|
||||
|
||||
fn release_waiter(&mut self, key: &Key, token: usize) {
|
||||
self.waiters.remove(token);
|
||||
self.waiters_queue.remove(&(key.clone(), token));
|
||||
}
|
||||
|
||||
fn release_conn(&mut self, key: &Key, io: ConnectionType<Io>, created: Instant) {
|
||||
self.acquired -= 1;
|
||||
self.available
|
||||
.entry(key.clone())
|
||||
.or_insert_with(VecDeque::new)
|
||||
.push_back(AvailableConnection {
|
||||
io,
|
||||
created,
|
||||
used: Instant::now(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl<Io> Inner<Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + 'static,
|
||||
{
|
||||
/// connection is not available, wait
|
||||
fn wait_for(
|
||||
&mut self,
|
||||
connect: Uri,
|
||||
) -> (
|
||||
oneshot::Receiver<Result<IoConnection<Io>, ConnectError>>,
|
||||
usize,
|
||||
) {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let key: Key = connect.authority_part().unwrap().clone().into();
|
||||
let entry = self.waiters.vacant_entry();
|
||||
let token = entry.key();
|
||||
entry.insert((connect, tx));
|
||||
assert!(!self.waiters_queue.insert((key, token)));
|
||||
(rx, token)
|
||||
}
|
||||
|
||||
fn acquire(&mut self, key: &Key) -> Acquire<Io> {
|
||||
// check limits
|
||||
if self.limit > 0 && self.acquired >= self.limit {
|
||||
return Acquire::NotAvailable;
|
||||
}
|
||||
|
||||
self.reserve();
|
||||
|
||||
// check if open connection is available
|
||||
// cleanup stale connections at the same time
|
||||
if let Some(ref mut connections) = self.available.get_mut(key) {
|
||||
let now = Instant::now();
|
||||
while let Some(conn) = connections.pop_back() {
|
||||
// check if it still usable
|
||||
if (now - conn.used) > self.conn_keep_alive
|
||||
|| (now - conn.created) > self.conn_lifetime
|
||||
{
|
||||
if let Some(timeout) = self.disconnect_timeout {
|
||||
if let ConnectionType::H1(io) = conn.io {
|
||||
tokio_current_thread::spawn(CloseConnection::new(
|
||||
io, timeout,
|
||||
))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let mut io = conn.io;
|
||||
let mut buf = [0; 2];
|
||||
if let ConnectionType::H1(ref mut s) = io {
|
||||
match s.read(&mut buf) {
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => (),
|
||||
Ok(n) if n > 0 => {
|
||||
if let Some(timeout) = self.disconnect_timeout {
|
||||
if let ConnectionType::H1(io) = io {
|
||||
tokio_current_thread::spawn(
|
||||
CloseConnection::new(io, timeout),
|
||||
)
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
Ok(_) | Err(_) => continue,
|
||||
}
|
||||
}
|
||||
return Acquire::Acquired(io, conn.created);
|
||||
}
|
||||
}
|
||||
}
|
||||
Acquire::Available
|
||||
}
|
||||
|
||||
fn release_close(&mut self, io: ConnectionType<Io>) {
|
||||
self.acquired -= 1;
|
||||
if let Some(timeout) = self.disconnect_timeout {
|
||||
if let ConnectionType::H1(io) = io {
|
||||
tokio_current_thread::spawn(CloseConnection::new(io, timeout))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn check_availibility(&self) {
|
||||
if !self.waiters_queue.is_empty() && self.acquired < self.limit {
|
||||
self.task.notify()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// struct ConnectorPoolSupport<T, Io>
|
||||
// where
|
||||
// Io: AsyncRead + AsyncWrite + 'static,
|
||||
// {
|
||||
// connector: T,
|
||||
// inner: Rc<RefCell<Inner<Io>>>,
|
||||
// }
|
||||
|
||||
// impl<T, Io> Future for ConnectorPoolSupport<T, Io>
|
||||
// where
|
||||
// Io: AsyncRead + AsyncWrite + 'static,
|
||||
// T: Service<Connect, Response = (Io, Protocol), Error = ConnectorError>,
|
||||
// T::Future: 'static,
|
||||
// {
|
||||
// type Item = ();
|
||||
// type Error = ();
|
||||
|
||||
// fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
// let mut inner = self.inner.as_ref().borrow_mut();
|
||||
// inner.task.register();
|
||||
|
||||
// // check waiters
|
||||
// loop {
|
||||
// let (key, token) = {
|
||||
// if let Some((key, token)) = inner.waiters_queue.get_index(0) {
|
||||
// (key.clone(), *token)
|
||||
// } else {
|
||||
// break;
|
||||
// }
|
||||
// };
|
||||
// match inner.acquire(&key) {
|
||||
// Acquire::NotAvailable => break,
|
||||
// Acquire::Acquired(io, created) => {
|
||||
// let (_, tx) = inner.waiters.remove(token);
|
||||
// if let Err(conn) = tx.send(Ok(IoConnection::new(
|
||||
// io,
|
||||
// created,
|
||||
// Some(Acquired(key.clone(), Some(self.inner.clone()))),
|
||||
// ))) {
|
||||
// let (io, created) = conn.unwrap().into_inner();
|
||||
// inner.release_conn(&key, io, created);
|
||||
// }
|
||||
// }
|
||||
// Acquire::Available => {
|
||||
// let (connect, tx) = inner.waiters.remove(token);
|
||||
// OpenWaitingConnection::spawn(
|
||||
// key.clone(),
|
||||
// tx,
|
||||
// self.inner.clone(),
|
||||
// self.connector.call(connect),
|
||||
// );
|
||||
// }
|
||||
// }
|
||||
// let _ = inner.waiters_queue.swap_remove_index(0);
|
||||
// }
|
||||
|
||||
// Ok(Async::NotReady)
|
||||
// }
|
||||
// }
|
||||
|
||||
struct CloseConnection<T> {
|
||||
io: T,
|
||||
timeout: Delay,
|
||||
}
|
||||
|
||||
impl<T> CloseConnection<T>
|
||||
where
|
||||
T: AsyncWrite,
|
||||
{
|
||||
fn new(io: T, timeout: Duration) -> Self {
|
||||
CloseConnection {
|
||||
io,
|
||||
timeout: sleep(timeout),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Future for CloseConnection<T>
|
||||
where
|
||||
T: AsyncWrite,
|
||||
{
|
||||
type Item = ();
|
||||
type Error = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<(), ()> {
|
||||
match self.timeout.poll() {
|
||||
Ok(Async::Ready(_)) | Err(_) => Ok(Async::Ready(())),
|
||||
Ok(Async::NotReady) => match self.io.shutdown() {
|
||||
Ok(Async::Ready(_)) | Err(_) => Ok(Async::Ready(())),
|
||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct Acquired<T>(Key, Option<Rc<RefCell<Inner<T>>>>);
|
||||
|
||||
impl<T> Acquired<T>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + 'static,
|
||||
{
|
||||
pub(crate) fn close(&mut self, conn: IoConnection<T>) {
|
||||
if let Some(inner) = self.1.take() {
|
||||
let (io, _) = conn.into_inner();
|
||||
inner.as_ref().borrow_mut().release_close(io);
|
||||
}
|
||||
}
|
||||
pub(crate) fn release(&mut self, conn: IoConnection<T>) {
|
||||
if let Some(inner) = self.1.take() {
|
||||
let (io, created) = conn.into_inner();
|
||||
inner
|
||||
.as_ref()
|
||||
.borrow_mut()
|
||||
.release_conn(&self.0, io, created);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for Acquired<T> {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.1.take() {
|
||||
inner.as_ref().borrow_mut().release();
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user