mirror of
https://github.com/actix/actix-extras.git
synced 2024-11-23 23:51:06 +01:00
migrate client to std::future
This commit is contained in:
parent
8cba1170e6
commit
9e95efcc16
@ -1,9 +1,10 @@
|
|||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
use std::{fmt, io, time};
|
use std::{fmt, io, time};
|
||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite, Framed};
|
use actix_codec::{AsyncRead, AsyncWrite, Framed};
|
||||||
use bytes::{Buf, Bytes};
|
use bytes::{Buf, Bytes};
|
||||||
use futures::future::{err, Either, Future, FutureResult};
|
use futures::future::{err, Either, Future, FutureExt, LocalBoxFuture, Ready};
|
||||||
use futures::Poll;
|
|
||||||
use h2::client::SendRequest;
|
use h2::client::SendRequest;
|
||||||
|
|
||||||
use crate::body::MessageBody;
|
use crate::body::MessageBody;
|
||||||
@ -22,7 +23,7 @@ pub(crate) enum ConnectionType<Io> {
|
|||||||
|
|
||||||
pub trait Connection {
|
pub trait Connection {
|
||||||
type Io: AsyncRead + AsyncWrite;
|
type Io: AsyncRead + AsyncWrite;
|
||||||
type Future: Future<Item = (ResponseHead, Payload), Error = SendRequestError>;
|
type Future: Future<Output = Result<(ResponseHead, Payload), SendRequestError>>;
|
||||||
|
|
||||||
fn protocol(&self) -> Protocol;
|
fn protocol(&self) -> Protocol;
|
||||||
|
|
||||||
@ -34,15 +35,16 @@ pub trait Connection {
|
|||||||
) -> Self::Future;
|
) -> Self::Future;
|
||||||
|
|
||||||
type TunnelFuture: Future<
|
type TunnelFuture: Future<
|
||||||
Item = (ResponseHead, Framed<Self::Io, ClientCodec>),
|
Output = Result<(ResponseHead, Framed<Self::Io, ClientCodec>), SendRequestError>,
|
||||||
Error = SendRequestError,
|
|
||||||
>;
|
>;
|
||||||
|
|
||||||
/// Send request, returns Response and Framed
|
/// Send request, returns Response and Framed
|
||||||
fn open_tunnel<H: Into<RequestHeadType>>(self, head: H) -> Self::TunnelFuture;
|
fn open_tunnel<H: Into<RequestHeadType>>(self, head: H) -> Self::TunnelFuture;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) trait ConnectionLifetime: AsyncRead + AsyncWrite + 'static {
|
pub(crate) trait ConnectionLifetime:
|
||||||
|
AsyncRead + AsyncWrite + Unpin + 'static
|
||||||
|
{
|
||||||
/// Close connection
|
/// Close connection
|
||||||
fn close(&mut self);
|
fn close(&mut self);
|
||||||
|
|
||||||
@ -91,11 +93,11 @@ impl<T: AsyncRead + AsyncWrite> IoConnection<T> {
|
|||||||
|
|
||||||
impl<T> Connection for IoConnection<T>
|
impl<T> Connection for IoConnection<T>
|
||||||
where
|
where
|
||||||
T: AsyncRead + AsyncWrite + 'static,
|
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
{
|
{
|
||||||
type Io = T;
|
type Io = T;
|
||||||
type Future =
|
type Future =
|
||||||
Box<dyn Future<Item = (ResponseHead, Payload), Error = SendRequestError>>;
|
LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>;
|
||||||
|
|
||||||
fn protocol(&self) -> Protocol {
|
fn protocol(&self) -> Protocol {
|
||||||
match self.io {
|
match self.io {
|
||||||
@ -111,38 +113,30 @@ where
|
|||||||
body: B,
|
body: B,
|
||||||
) -> Self::Future {
|
) -> Self::Future {
|
||||||
match self.io.take().unwrap() {
|
match self.io.take().unwrap() {
|
||||||
ConnectionType::H1(io) => Box::new(h1proto::send_request(
|
ConnectionType::H1(io) => {
|
||||||
io,
|
h1proto::send_request(io, head.into(), body, self.created, self.pool)
|
||||||
head.into(),
|
.boxed_local()
|
||||||
body,
|
}
|
||||||
self.created,
|
ConnectionType::H2(io) => {
|
||||||
self.pool,
|
h2proto::send_request(io, head.into(), body, self.created, self.pool)
|
||||||
)),
|
.boxed_local()
|
||||||
ConnectionType::H2(io) => Box::new(h2proto::send_request(
|
}
|
||||||
io,
|
|
||||||
head.into(),
|
|
||||||
body,
|
|
||||||
self.created,
|
|
||||||
self.pool,
|
|
||||||
)),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type TunnelFuture = Either<
|
type TunnelFuture = Either<
|
||||||
Box<
|
LocalBoxFuture<
|
||||||
dyn Future<
|
'static,
|
||||||
Item = (ResponseHead, Framed<Self::Io, ClientCodec>),
|
Result<(ResponseHead, Framed<Self::Io, ClientCodec>), SendRequestError>,
|
||||||
Error = SendRequestError,
|
|
||||||
>,
|
|
||||||
>,
|
>,
|
||||||
FutureResult<(ResponseHead, Framed<Self::Io, ClientCodec>), SendRequestError>,
|
Ready<Result<(ResponseHead, Framed<Self::Io, ClientCodec>), SendRequestError>>,
|
||||||
>;
|
>;
|
||||||
|
|
||||||
/// Send request, returns Response and Framed
|
/// Send request, returns Response and Framed
|
||||||
fn open_tunnel<H: Into<RequestHeadType>>(mut self, head: H) -> Self::TunnelFuture {
|
fn open_tunnel<H: Into<RequestHeadType>>(mut self, head: H) -> Self::TunnelFuture {
|
||||||
match self.io.take().unwrap() {
|
match self.io.take().unwrap() {
|
||||||
ConnectionType::H1(io) => {
|
ConnectionType::H1(io) => {
|
||||||
Either::A(Box::new(h1proto::open_tunnel(io, head.into())))
|
Either::Left(h1proto::open_tunnel(io, head.into()).boxed_local())
|
||||||
}
|
}
|
||||||
ConnectionType::H2(io) => {
|
ConnectionType::H2(io) => {
|
||||||
if let Some(mut pool) = self.pool.take() {
|
if let Some(mut pool) = self.pool.take() {
|
||||||
@ -152,7 +146,7 @@ where
|
|||||||
None,
|
None,
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
Either::B(err(SendRequestError::TunnelNotSupported))
|
Either::Right(err(SendRequestError::TunnelNotSupported))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -166,12 +160,12 @@ pub(crate) enum EitherConnection<A, B> {
|
|||||||
|
|
||||||
impl<A, B> Connection for EitherConnection<A, B>
|
impl<A, B> Connection for EitherConnection<A, B>
|
||||||
where
|
where
|
||||||
A: AsyncRead + AsyncWrite + 'static,
|
A: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
B: AsyncRead + AsyncWrite + 'static,
|
B: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
{
|
{
|
||||||
type Io = EitherIo<A, B>;
|
type Io = EitherIo<A, B>;
|
||||||
type Future =
|
type Future =
|
||||||
Box<dyn Future<Item = (ResponseHead, Payload), Error = SendRequestError>>;
|
LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>;
|
||||||
|
|
||||||
fn protocol(&self) -> Protocol {
|
fn protocol(&self) -> Protocol {
|
||||||
match self {
|
match self {
|
||||||
@ -191,24 +185,22 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type TunnelFuture = Box<
|
type TunnelFuture = LocalBoxFuture<
|
||||||
dyn Future<
|
'static,
|
||||||
Item = (ResponseHead, Framed<Self::Io, ClientCodec>),
|
Result<(ResponseHead, Framed<Self::Io, ClientCodec>), SendRequestError>,
|
||||||
Error = SendRequestError,
|
|
||||||
>,
|
|
||||||
>;
|
>;
|
||||||
|
|
||||||
/// Send request, returns Response and Framed
|
/// Send request, returns Response and Framed
|
||||||
fn open_tunnel<H: Into<RequestHeadType>>(self, head: H) -> Self::TunnelFuture {
|
fn open_tunnel<H: Into<RequestHeadType>>(self, head: H) -> Self::TunnelFuture {
|
||||||
match self {
|
match self {
|
||||||
EitherConnection::A(con) => Box::new(
|
EitherConnection::A(con) => con
|
||||||
con.open_tunnel(head)
|
.open_tunnel(head)
|
||||||
.map(|(head, framed)| (head, framed.map_io(EitherIo::A))),
|
.map(|res| res.map(|(head, framed)| (head, framed.map_io(EitherIo::A))))
|
||||||
),
|
.boxed_local(),
|
||||||
EitherConnection::B(con) => Box::new(
|
EitherConnection::B(con) => con
|
||||||
con.open_tunnel(head)
|
.open_tunnel(head)
|
||||||
.map(|(head, framed)| (head, framed.map_io(EitherIo::B))),
|
.map(|res| res.map(|(head, framed)| (head, framed.map_io(EitherIo::B))))
|
||||||
),
|
.boxed_local(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -218,24 +210,22 @@ pub enum EitherIo<A, B> {
|
|||||||
B(B),
|
B(B),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<A, B> io::Read for EitherIo<A, B>
|
|
||||||
where
|
|
||||||
A: io::Read,
|
|
||||||
B: io::Read,
|
|
||||||
{
|
|
||||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
|
||||||
match self {
|
|
||||||
EitherIo::A(ref mut val) => val.read(buf),
|
|
||||||
EitherIo::B(ref mut val) => val.read(buf),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<A, B> AsyncRead for EitherIo<A, B>
|
impl<A, B> AsyncRead for EitherIo<A, B>
|
||||||
where
|
where
|
||||||
A: AsyncRead,
|
A: AsyncRead + Unpin,
|
||||||
B: AsyncRead,
|
B: AsyncRead + Unpin,
|
||||||
{
|
{
|
||||||
|
fn poll_read(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &mut [u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
match self.get_mut() {
|
||||||
|
EitherIo::A(ref mut val) => Pin::new(val).poll_read(cx, buf),
|
||||||
|
EitherIo::B(ref mut val) => Pin::new(val).poll_read(cx, buf),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
|
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
|
||||||
match self {
|
match self {
|
||||||
EitherIo::A(ref val) => val.prepare_uninitialized_buffer(buf),
|
EitherIo::A(ref val) => val.prepare_uninitialized_buffer(buf),
|
||||||
@ -244,45 +234,50 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<A, B> io::Write for EitherIo<A, B>
|
|
||||||
where
|
|
||||||
A: io::Write,
|
|
||||||
B: io::Write,
|
|
||||||
{
|
|
||||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
|
||||||
match self {
|
|
||||||
EitherIo::A(ref mut val) => val.write(buf),
|
|
||||||
EitherIo::B(ref mut val) => val.write(buf),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn flush(&mut self) -> io::Result<()> {
|
|
||||||
match self {
|
|
||||||
EitherIo::A(ref mut val) => val.flush(),
|
|
||||||
EitherIo::B(ref mut val) => val.flush(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<A, B> AsyncWrite for EitherIo<A, B>
|
impl<A, B> AsyncWrite for EitherIo<A, B>
|
||||||
where
|
where
|
||||||
A: AsyncWrite,
|
A: AsyncWrite + Unpin,
|
||||||
B: AsyncWrite,
|
B: AsyncWrite + Unpin,
|
||||||
{
|
{
|
||||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
fn poll_write(
|
||||||
match self {
|
self: Pin<&mut Self>,
|
||||||
EitherIo::A(ref mut val) => val.shutdown(),
|
cx: &mut Context,
|
||||||
EitherIo::B(ref mut val) => val.shutdown(),
|
buf: &[u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
match self.get_mut() {
|
||||||
|
EitherIo::A(ref mut val) => Pin::new(val).poll_write(cx, buf),
|
||||||
|
EitherIo::B(ref mut val) => Pin::new(val).poll_write(cx, buf),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_buf<U: Buf>(&mut self, buf: &mut U) -> Poll<usize, io::Error>
|
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
match self.get_mut() {
|
||||||
|
EitherIo::A(ref mut val) => Pin::new(val).poll_flush(cx),
|
||||||
|
EitherIo::B(ref mut val) => Pin::new(val).poll_flush(cx),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_shutdown(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
) -> Poll<io::Result<()>> {
|
||||||
|
match self.get_mut() {
|
||||||
|
EitherIo::A(ref mut val) => Pin::new(val).poll_shutdown(cx),
|
||||||
|
EitherIo::B(ref mut val) => Pin::new(val).poll_shutdown(cx),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_write_buf<U: Buf>(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &mut U,
|
||||||
|
) -> Poll<Result<usize, io::Error>>
|
||||||
where
|
where
|
||||||
Self: Sized,
|
Self: Sized,
|
||||||
{
|
{
|
||||||
match self {
|
match self.get_mut() {
|
||||||
EitherIo::A(ref mut val) => val.write_buf(buf),
|
EitherIo::A(ref mut val) => Pin::new(val).poll_write_buf(cx, buf),
|
||||||
EitherIo::B(ref mut val) => val.write_buf(buf),
|
EitherIo::B(ref mut val) => Pin::new(val).poll_write_buf(cx, buf),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -11,6 +11,7 @@ use actix_connect::{
|
|||||||
};
|
};
|
||||||
use actix_service::{apply_fn, Service};
|
use actix_service::{apply_fn, Service};
|
||||||
use actix_utils::timeout::{TimeoutError, TimeoutService};
|
use actix_utils::timeout::{TimeoutError, TimeoutService};
|
||||||
|
use futures::future::Ready;
|
||||||
use http::Uri;
|
use http::Uri;
|
||||||
use tokio_net::tcp::TcpStream;
|
use tokio_net::tcp::TcpStream;
|
||||||
|
|
||||||
@ -116,12 +117,13 @@ impl<T, U> Connector<T, U> {
|
|||||||
/// Use custom connector.
|
/// Use custom connector.
|
||||||
pub fn connector<T1, U1>(self, connector: T1) -> Connector<T1, U1>
|
pub fn connector<T1, U1>(self, connector: T1) -> Connector<T1, U1>
|
||||||
where
|
where
|
||||||
U1: AsyncRead + AsyncWrite + fmt::Debug,
|
U1: AsyncRead + AsyncWrite + Unpin + fmt::Debug,
|
||||||
T1: Service<
|
T1: Service<
|
||||||
Request = TcpConnect<Uri>,
|
Request = TcpConnect<Uri>,
|
||||||
Response = TcpConnection<Uri, U1>,
|
Response = TcpConnection<Uri, U1>,
|
||||||
Error = actix_connect::ConnectError,
|
Error = actix_connect::ConnectError,
|
||||||
> + Clone,
|
> + Clone,
|
||||||
|
T1::Future: Unpin,
|
||||||
{
|
{
|
||||||
Connector {
|
Connector {
|
||||||
connector,
|
connector,
|
||||||
@ -138,13 +140,12 @@ impl<T, U> Connector<T, U> {
|
|||||||
|
|
||||||
impl<T, U> Connector<T, U>
|
impl<T, U> Connector<T, U>
|
||||||
where
|
where
|
||||||
U: AsyncRead + AsyncWrite + fmt::Debug + 'static,
|
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static,
|
||||||
T: Service<
|
T: Service<
|
||||||
Request = TcpConnect<Uri>,
|
Request = TcpConnect<Uri>,
|
||||||
Response = TcpConnection<Uri, U>,
|
Response = TcpConnection<Uri, U>,
|
||||||
Error = actix_connect::ConnectError,
|
Error = actix_connect::ConnectError,
|
||||||
> + Clone
|
> + 'static,
|
||||||
+ 'static,
|
|
||||||
{
|
{
|
||||||
/// Connection timeout, i.e. max time to connect to remote host including dns name resolution.
|
/// Connection timeout, i.e. max time to connect to remote host including dns name resolution.
|
||||||
/// Set to 1 second by default.
|
/// Set to 1 second by default.
|
||||||
@ -220,7 +221,7 @@ where
|
|||||||
{
|
{
|
||||||
let connector = TimeoutService::new(
|
let connector = TimeoutService::new(
|
||||||
self.timeout,
|
self.timeout,
|
||||||
apply_fn(self.connector, |msg: Connect, srv| {
|
apply_fn(UnpinWrapper(self.connector), |msg: Connect, srv| {
|
||||||
srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr))
|
srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr))
|
||||||
})
|
})
|
||||||
.map_err(ConnectError::from)
|
.map_err(ConnectError::from)
|
||||||
@ -337,10 +338,48 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct UnpinWrapper<T>(T);
|
||||||
|
|
||||||
|
impl<T: Service> Unpin for UnpinWrapper<T> {}
|
||||||
|
|
||||||
|
impl<T: Service> Service for UnpinWrapper<T> {
|
||||||
|
type Request = T::Request;
|
||||||
|
type Response = T::Response;
|
||||||
|
type Error = T::Error;
|
||||||
|
type Future = UnpinWrapperFut<T>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), T::Error>> {
|
||||||
|
self.0.poll_ready(cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, req: T::Request) -> Self::Future {
|
||||||
|
UnpinWrapperFut {
|
||||||
|
fut: self.0.call(req),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct UnpinWrapperFut<T: Service> {
|
||||||
|
fut: T::Future,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Service> Unpin for UnpinWrapperFut<T> {}
|
||||||
|
|
||||||
|
impl<T: Service> Future for UnpinWrapperFut<T> {
|
||||||
|
type Output = Result<T::Response, T::Error>;
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||||
|
unsafe { Pin::new_unchecked(&mut self.get_mut().fut) }.poll(cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(not(any(feature = "ssl", feature = "rust-tls")))]
|
#[cfg(not(any(feature = "ssl", feature = "rust-tls")))]
|
||||||
mod connect_impl {
|
mod connect_impl {
|
||||||
use futures::future::{err, Either, FutureResult};
|
use std::task::{Context, Poll};
|
||||||
use futures::Poll;
|
|
||||||
|
use futures::future::{err, Either, Ready};
|
||||||
|
use futures::ready;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::client::connection::IoConnection;
|
use crate::client::connection::IoConnection;
|
||||||
@ -349,7 +388,7 @@ mod connect_impl {
|
|||||||
where
|
where
|
||||||
Io: AsyncRead + AsyncWrite + 'static,
|
Io: AsyncRead + AsyncWrite + 'static,
|
||||||
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
|
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
|
||||||
+ Clone
|
+ Unpin
|
||||||
+ 'static,
|
+ 'static,
|
||||||
{
|
{
|
||||||
pub(crate) tcp_pool: ConnectionPool<T, Io>,
|
pub(crate) tcp_pool: ConnectionPool<T, Io>,
|
||||||
@ -359,7 +398,7 @@ mod connect_impl {
|
|||||||
where
|
where
|
||||||
Io: AsyncRead + AsyncWrite + 'static,
|
Io: AsyncRead + AsyncWrite + 'static,
|
||||||
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
|
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
|
||||||
+ Clone
|
+ Unpin
|
||||||
+ 'static,
|
+ 'static,
|
||||||
{
|
{
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
@ -371,29 +410,30 @@ mod connect_impl {
|
|||||||
|
|
||||||
impl<T, Io> Service for InnerConnector<T, Io>
|
impl<T, Io> Service for InnerConnector<T, Io>
|
||||||
where
|
where
|
||||||
Io: AsyncRead + AsyncWrite + 'static,
|
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
|
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
|
||||||
+ Clone
|
+ Unpin
|
||||||
+ 'static,
|
+ 'static,
|
||||||
|
T::Future: Unpin,
|
||||||
{
|
{
|
||||||
type Request = Connect;
|
type Request = Connect;
|
||||||
type Response = IoConnection<Io>;
|
type Response = IoConnection<Io>;
|
||||||
type Error = ConnectError;
|
type Error = ConnectError;
|
||||||
type Future = Either<
|
type Future = Either<
|
||||||
<ConnectionPool<T, Io> as Service>::Future,
|
<ConnectionPool<T, Io> as Service>::Future,
|
||||||
FutureResult<IoConnection<Io>, ConnectError>,
|
Ready<Result<IoConnection<Io>, ConnectError>>,
|
||||||
>;
|
>;
|
||||||
|
|
||||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
|
||||||
self.tcp_pool.poll_ready()
|
self.tcp_pool.poll_ready(cx)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn call(&mut self, req: Connect) -> Self::Future {
|
fn call(&mut self, req: Connect) -> Self::Future {
|
||||||
match req.uri.scheme_str() {
|
match req.uri.scheme_str() {
|
||||||
Some("https") | Some("wss") => {
|
Some("https") | Some("wss") => {
|
||||||
Either::B(err(ConnectError::SslIsNotSupported))
|
Either::Right(err(ConnectError::SslIsNotSupported))
|
||||||
}
|
}
|
||||||
_ => Either::A(self.tcp_pool.call(req)),
|
_ => Either::Left(self.tcp_pool.call(req)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -403,18 +443,20 @@ mod connect_impl {
|
|||||||
mod connect_impl {
|
mod connect_impl {
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
|
|
||||||
use futures::future::{Either, FutureResult};
|
use futures::future::Either;
|
||||||
use futures::{Async, Future, Poll};
|
use futures::ready;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::client::connection::EitherConnection;
|
use crate::client::connection::EitherConnection;
|
||||||
|
|
||||||
pub(crate) struct InnerConnector<T1, T2, Io1, Io2>
|
pub(crate) struct InnerConnector<T1, T2, Io1, Io2>
|
||||||
where
|
where
|
||||||
Io1: AsyncRead + AsyncWrite + 'static,
|
Io1: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
Io2: AsyncRead + AsyncWrite + 'static,
|
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
|
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
|
||||||
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
|
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
|
||||||
|
T1::Future: Unpin,
|
||||||
|
T2::Future: Unpin,
|
||||||
{
|
{
|
||||||
pub(crate) tcp_pool: ConnectionPool<T1, Io1>,
|
pub(crate) tcp_pool: ConnectionPool<T1, Io1>,
|
||||||
pub(crate) ssl_pool: ConnectionPool<T2, Io2>,
|
pub(crate) ssl_pool: ConnectionPool<T2, Io2>,
|
||||||
@ -422,14 +464,16 @@ mod connect_impl {
|
|||||||
|
|
||||||
impl<T1, T2, Io1, Io2> Clone for InnerConnector<T1, T2, Io1, Io2>
|
impl<T1, T2, Io1, Io2> Clone for InnerConnector<T1, T2, Io1, Io2>
|
||||||
where
|
where
|
||||||
Io1: AsyncRead + AsyncWrite + 'static,
|
Io1: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
Io2: AsyncRead + AsyncWrite + 'static,
|
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
|
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
|
||||||
+ Clone
|
+ Unpin
|
||||||
+ 'static,
|
+ 'static,
|
||||||
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
|
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
|
||||||
+ Clone
|
+ Unpin
|
||||||
+ 'static,
|
+ 'static,
|
||||||
|
T1::Future: Unpin,
|
||||||
|
T2::Future: Unpin,
|
||||||
{
|
{
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
InnerConnector {
|
InnerConnector {
|
||||||
@ -441,52 +485,50 @@ mod connect_impl {
|
|||||||
|
|
||||||
impl<T1, T2, Io1, Io2> Service for InnerConnector<T1, T2, Io1, Io2>
|
impl<T1, T2, Io1, Io2> Service for InnerConnector<T1, T2, Io1, Io2>
|
||||||
where
|
where
|
||||||
Io1: AsyncRead + AsyncWrite + 'static,
|
Io1: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
Io2: AsyncRead + AsyncWrite + 'static,
|
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
|
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
|
||||||
+ Clone
|
+ Unpin
|
||||||
+ 'static,
|
+ 'static,
|
||||||
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
|
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
|
||||||
+ Clone
|
+ Unpin
|
||||||
+ 'static,
|
+ 'static,
|
||||||
|
T1::Future: Unpin,
|
||||||
|
T2::Future: Unpin,
|
||||||
{
|
{
|
||||||
type Request = Connect;
|
type Request = Connect;
|
||||||
type Response = EitherConnection<Io1, Io2>;
|
type Response = EitherConnection<Io1, Io2>;
|
||||||
type Error = ConnectError;
|
type Error = ConnectError;
|
||||||
type Future = Either<
|
type Future = Either<
|
||||||
FutureResult<Self::Response, Self::Error>,
|
InnerConnectorResponseA<T1, Io1, Io2>,
|
||||||
Either<
|
InnerConnectorResponseB<T2, Io1, Io2>,
|
||||||
InnerConnectorResponseA<T1, Io1, Io2>,
|
|
||||||
InnerConnectorResponseB<T2, Io1, Io2>,
|
|
||||||
>,
|
|
||||||
>;
|
>;
|
||||||
|
|
||||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
|
||||||
self.tcp_pool.poll_ready()
|
self.tcp_pool.poll_ready(cx)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn call(&mut self, req: Connect) -> Self::Future {
|
fn call(&mut self, req: Connect) -> Self::Future {
|
||||||
match req.uri.scheme_str() {
|
match req.uri.scheme_str() {
|
||||||
Some("https") | Some("wss") => {
|
Some("https") | Some("wss") => Either::B(InnerConnectorResponseB {
|
||||||
Either::B(Either::B(InnerConnectorResponseB {
|
fut: self.ssl_pool.call(req),
|
||||||
fut: self.ssl_pool.call(req),
|
_t: PhantomData,
|
||||||
_t: PhantomData,
|
}),
|
||||||
}))
|
_ => Either::A(InnerConnectorResponseA {
|
||||||
}
|
|
||||||
_ => Either::B(Either::A(InnerConnectorResponseA {
|
|
||||||
fut: self.tcp_pool.call(req),
|
fut: self.tcp_pool.call(req),
|
||||||
_t: PhantomData,
|
_t: PhantomData,
|
||||||
})),
|
}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) struct InnerConnectorResponseA<T, Io1, Io2>
|
pub(crate) struct InnerConnectorResponseA<T, Io1, Io2>
|
||||||
where
|
where
|
||||||
Io1: AsyncRead + AsyncWrite + 'static,
|
Io1: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
|
T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
|
||||||
+ Clone
|
+ Unpin
|
||||||
+ 'static,
|
+ 'static,
|
||||||
|
T::Future: Unpin,
|
||||||
{
|
{
|
||||||
fut: <ConnectionPool<T, Io1> as Service>::Future,
|
fut: <ConnectionPool<T, Io1> as Service>::Future,
|
||||||
_t: PhantomData<Io2>,
|
_t: PhantomData<Io2>,
|
||||||
@ -495,28 +537,29 @@ mod connect_impl {
|
|||||||
impl<T, Io1, Io2> Future for InnerConnectorResponseA<T, Io1, Io2>
|
impl<T, Io1, Io2> Future for InnerConnectorResponseA<T, Io1, Io2>
|
||||||
where
|
where
|
||||||
T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
|
T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
|
||||||
+ Clone
|
+ Unpin
|
||||||
+ 'static,
|
+ 'static,
|
||||||
Io1: AsyncRead + AsyncWrite + 'static,
|
T::Future: Unpin,
|
||||||
Io2: AsyncRead + AsyncWrite + 'static,
|
Io1: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
|
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
{
|
{
|
||||||
type Item = EitherConnection<Io1, Io2>;
|
type Output = Result<EitherConnection<Io1, Io2>, ConnectError>;
|
||||||
type Error = ConnectError;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||||
match self.fut.poll()? {
|
Poll::Ready(
|
||||||
Async::NotReady => Ok(Async::NotReady),
|
ready!(Pin::new(&mut self.get_mut().fut).poll(cx))
|
||||||
Async::Ready(res) => Ok(Async::Ready(EitherConnection::A(res))),
|
.map(|res| EitherConnection::A(res)),
|
||||||
}
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) struct InnerConnectorResponseB<T, Io1, Io2>
|
pub(crate) struct InnerConnectorResponseB<T, Io1, Io2>
|
||||||
where
|
where
|
||||||
Io2: AsyncRead + AsyncWrite + 'static,
|
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
|
T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
|
||||||
+ Clone
|
+ Unpin
|
||||||
+ 'static,
|
+ 'static,
|
||||||
|
T::Future: Unpin,
|
||||||
{
|
{
|
||||||
fut: <ConnectionPool<T, Io2> as Service>::Future,
|
fut: <ConnectionPool<T, Io2> as Service>::Future,
|
||||||
_t: PhantomData<Io1>,
|
_t: PhantomData<Io1>,
|
||||||
@ -525,19 +568,19 @@ mod connect_impl {
|
|||||||
impl<T, Io1, Io2> Future for InnerConnectorResponseB<T, Io1, Io2>
|
impl<T, Io1, Io2> Future for InnerConnectorResponseB<T, Io1, Io2>
|
||||||
where
|
where
|
||||||
T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
|
T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
|
||||||
+ Clone
|
+ Unpin
|
||||||
+ 'static,
|
+ 'static,
|
||||||
Io1: AsyncRead + AsyncWrite + 'static,
|
T::Future: Unpin,
|
||||||
Io2: AsyncRead + AsyncWrite + 'static,
|
Io1: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
|
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
{
|
{
|
||||||
type Item = EitherConnection<Io1, Io2>;
|
type Output = Result<EitherConnection<Io1, Io2>, ConnectError>;
|
||||||
type Error = ConnectError;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||||
match self.fut.poll()? {
|
Poll::Ready(
|
||||||
Async::NotReady => Ok(Async::NotReady),
|
ready!(Pin::new(&mut self.get_mut().fut).poll(cx))
|
||||||
Async::Ready(res) => Ok(Async::Ready(EitherConnection::B(res))),
|
.map(|res| EitherConnection::B(res)),
|
||||||
}
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -6,8 +6,8 @@ use std::{io, time};
|
|||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite, Framed};
|
use actix_codec::{AsyncRead, AsyncWrite, Framed};
|
||||||
use bytes::{BufMut, Bytes, BytesMut};
|
use bytes::{BufMut, Bytes, BytesMut};
|
||||||
use futures::future::{ok, Either};
|
use futures::future::{ok, poll_fn, Either};
|
||||||
use futures::{Sink, Stream};
|
use futures::{Sink, SinkExt, Stream, StreamExt};
|
||||||
|
|
||||||
use crate::error::PayloadError;
|
use crate::error::PayloadError;
|
||||||
use crate::h1;
|
use crate::h1;
|
||||||
@ -21,15 +21,15 @@ use super::error::{ConnectError, SendRequestError};
|
|||||||
use super::pool::Acquired;
|
use super::pool::Acquired;
|
||||||
use crate::body::{BodySize, MessageBody};
|
use crate::body::{BodySize, MessageBody};
|
||||||
|
|
||||||
pub(crate) fn send_request<T, B>(
|
pub(crate) async fn send_request<T, B>(
|
||||||
io: T,
|
io: T,
|
||||||
mut head: RequestHeadType,
|
mut head: RequestHeadType,
|
||||||
body: B,
|
body: B,
|
||||||
created: time::Instant,
|
created: time::Instant,
|
||||||
pool: Option<Acquired<T>>,
|
pool: Option<Acquired<T>>,
|
||||||
) -> impl Future<Item = (ResponseHead, Payload), Error = SendRequestError>
|
) -> Result<(ResponseHead, Payload), SendRequestError>
|
||||||
where
|
where
|
||||||
T: AsyncRead + AsyncWrite + 'static,
|
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
B: MessageBody,
|
B: MessageBody,
|
||||||
{
|
{
|
||||||
// set request host header
|
// set request host header
|
||||||
@ -65,68 +65,98 @@ where
|
|||||||
io: Some(io),
|
io: Some(io),
|
||||||
};
|
};
|
||||||
|
|
||||||
let len = body.size();
|
|
||||||
|
|
||||||
// create Framed and send request
|
// create Framed and send request
|
||||||
Framed::new(io, h1::ClientCodec::default())
|
let mut framed = Framed::new(io, h1::ClientCodec::default());
|
||||||
.send((head, len).into())
|
framed.send((head, body.size()).into()).await?;
|
||||||
.from_err()
|
|
||||||
// send request body
|
// send request body
|
||||||
.and_then(move |framed| match body.size() {
|
match body.size() {
|
||||||
BodySize::None | BodySize::Empty | BodySize::Sized(0) => {
|
BodySize::None | BodySize::Empty | BodySize::Sized(0) => (),
|
||||||
Either::A(ok(framed))
|
_ => send_body(body, &mut framed).await?,
|
||||||
}
|
};
|
||||||
_ => Either::B(SendBody::new(body, framed)),
|
|
||||||
})
|
// read response and init read body
|
||||||
// read response and init read body
|
let (head, framed) = if let (Some(result), framed) = framed.into_future().await {
|
||||||
.and_then(|framed| {
|
let item = result.map_err(SendRequestError::from)?;
|
||||||
framed
|
(item, framed)
|
||||||
.into_future()
|
} else {
|
||||||
.map_err(|(e, _)| SendRequestError::from(e))
|
return Err(SendRequestError::from(ConnectError::Disconnected));
|
||||||
.and_then(|(item, framed)| {
|
};
|
||||||
if let Some(res) = item {
|
|
||||||
match framed.get_codec().message_type() {
|
match framed.get_codec().message_type() {
|
||||||
h1::MessageType::None => {
|
h1::MessageType::None => {
|
||||||
let force_close = !framed.get_codec().keepalive();
|
let force_close = !framed.get_codec().keepalive();
|
||||||
release_connection(framed, force_close);
|
release_connection(framed, force_close);
|
||||||
Ok((res, Payload::None))
|
Ok((head, Payload::None))
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
let pl: PayloadStream = Box::new(PlStream::new(framed));
|
let pl: PayloadStream = PlStream::new(framed).boxed_local();
|
||||||
Ok((res, pl.into()))
|
Ok((head, pl.into()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
Err(ConnectError::Disconnected.into())
|
|
||||||
}
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn open_tunnel<T>(
|
pub(crate) async fn open_tunnel<T>(
|
||||||
io: T,
|
io: T,
|
||||||
head: RequestHeadType,
|
head: RequestHeadType,
|
||||||
) -> impl Future<Item = (ResponseHead, Framed<T, h1::ClientCodec>), Error = SendRequestError>
|
) -> Result<(ResponseHead, Framed<T, h1::ClientCodec>), SendRequestError>
|
||||||
where
|
where
|
||||||
T: AsyncRead + AsyncWrite + 'static,
|
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
{
|
{
|
||||||
// create Framed and send request
|
// create Framed and send request
|
||||||
Framed::new(io, h1::ClientCodec::default())
|
let mut framed = Framed::new(io, h1::ClientCodec::default());
|
||||||
.send((head, BodySize::None).into())
|
framed.send((head, BodySize::None).into()).await?;
|
||||||
.from_err()
|
|
||||||
// read response
|
// read response
|
||||||
.and_then(|framed| {
|
if let (Some(result), framed) = framed.into_future().await {
|
||||||
framed
|
let head = result.map_err(SendRequestError::from)?;
|
||||||
.into_future()
|
Ok((head, framed))
|
||||||
.map_err(|(e, _)| SendRequestError::from(e))
|
} else {
|
||||||
.and_then(|(head, framed)| {
|
Err(SendRequestError::from(ConnectError::Disconnected))
|
||||||
if let Some(head) = head {
|
}
|
||||||
Ok((head, framed))
|
}
|
||||||
|
|
||||||
|
/// send request body to the peer
|
||||||
|
pub(crate) async fn send_body<I, B>(
|
||||||
|
mut body: B,
|
||||||
|
framed: &mut Framed<I, h1::ClientCodec>,
|
||||||
|
) -> Result<(), SendRequestError>
|
||||||
|
where
|
||||||
|
I: ConnectionLifetime,
|
||||||
|
B: MessageBody,
|
||||||
|
{
|
||||||
|
let mut eof = false;
|
||||||
|
while !eof {
|
||||||
|
while !eof && !framed.is_write_buf_full() {
|
||||||
|
match poll_fn(|cx| body.poll_next(cx)).await {
|
||||||
|
Some(result) => {
|
||||||
|
framed.write(h1::Message::Chunk(Some(result?)))?;
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
eof = true;
|
||||||
|
framed.write(h1::Message::Chunk(None))?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !framed.is_write_buf_empty() {
|
||||||
|
poll_fn(|cx| match framed.flush(cx) {
|
||||||
|
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
|
||||||
|
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
|
||||||
|
Poll::Pending => {
|
||||||
|
if !framed.is_write_buf_full() {
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
} else {
|
} else {
|
||||||
Err(SendRequestError::from(ConnectError::Disconnected))
|
Poll::Pending
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
})
|
})
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SinkExt::flush(framed).await?;
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
@ -137,7 +167,10 @@ pub struct H1Connection<T> {
|
|||||||
pool: Option<Acquired<T>>,
|
pool: Option<Acquired<T>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: AsyncRead + AsyncWrite + 'static> ConnectionLifetime for H1Connection<T> {
|
impl<T> ConnectionLifetime for H1Connection<T>
|
||||||
|
where
|
||||||
|
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
|
{
|
||||||
/// Close connection
|
/// Close connection
|
||||||
fn close(&mut self) {
|
fn close(&mut self) {
|
||||||
if let Some(mut pool) = self.pool.take() {
|
if let Some(mut pool) = self.pool.take() {
|
||||||
@ -165,98 +198,41 @@ impl<T: AsyncRead + AsyncWrite + 'static> ConnectionLifetime for H1Connection<T>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: AsyncRead + AsyncWrite + 'static> io::Read for H1Connection<T> {
|
impl<T: AsyncRead + AsyncWrite + Unpin + 'static> AsyncRead for H1Connection<T> {
|
||||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
|
||||||
self.io.as_mut().unwrap().read(buf)
|
self.io.as_ref().unwrap().prepare_uninitialized_buffer(buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_read(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &mut [u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
Pin::new(&mut self.io.as_mut().unwrap()).poll_read(cx, buf)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: AsyncRead + AsyncWrite + 'static> AsyncRead for H1Connection<T> {}
|
impl<T: AsyncRead + AsyncWrite + Unpin + 'static> AsyncWrite for H1Connection<T> {
|
||||||
|
fn poll_write(
|
||||||
impl<T: AsyncRead + AsyncWrite + 'static> io::Write for H1Connection<T> {
|
mut self: Pin<&mut Self>,
|
||||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
cx: &mut Context<'_>,
|
||||||
self.io.as_mut().unwrap().write(buf)
|
buf: &[u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
Pin::new(&mut self.io.as_mut().unwrap()).poll_write(cx, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn flush(&mut self) -> io::Result<()> {
|
fn poll_flush(
|
||||||
self.io.as_mut().unwrap().flush()
|
mut self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
) -> Poll<io::Result<()>> {
|
||||||
|
Pin::new(self.io.as_mut().unwrap()).poll_flush(cx)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: AsyncRead + AsyncWrite + 'static> AsyncWrite for H1Connection<T> {
|
fn poll_shutdown(
|
||||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
mut self: Pin<&mut Self>,
|
||||||
self.io.as_mut().unwrap().shutdown()
|
cx: &mut Context,
|
||||||
}
|
) -> Poll<Result<(), io::Error>> {
|
||||||
}
|
Pin::new(self.io.as_mut().unwrap()).poll_shutdown(cx)
|
||||||
|
|
||||||
/// Future responsible for sending request body to the peer
|
|
||||||
pub(crate) struct SendBody<I, B> {
|
|
||||||
body: Option<B>,
|
|
||||||
framed: Option<Framed<I, h1::ClientCodec>>,
|
|
||||||
flushed: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<I, B> SendBody<I, B>
|
|
||||||
where
|
|
||||||
I: AsyncRead + AsyncWrite + 'static,
|
|
||||||
B: MessageBody,
|
|
||||||
{
|
|
||||||
pub(crate) fn new(body: B, framed: Framed<I, h1::ClientCodec>) -> Self {
|
|
||||||
SendBody {
|
|
||||||
body: Some(body),
|
|
||||||
framed: Some(framed),
|
|
||||||
flushed: true,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<I, B> Future for SendBody<I, B>
|
|
||||||
where
|
|
||||||
I: ConnectionLifetime,
|
|
||||||
B: MessageBody,
|
|
||||||
{
|
|
||||||
type Item = Framed<I, h1::ClientCodec>;
|
|
||||||
type Error = SendRequestError;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
|
||||||
let mut body_ready = true;
|
|
||||||
loop {
|
|
||||||
while body_ready
|
|
||||||
&& self.body.is_some()
|
|
||||||
&& !self.framed.as_ref().unwrap().is_write_buf_full()
|
|
||||||
{
|
|
||||||
match self.body.as_mut().unwrap().poll_next()? {
|
|
||||||
Async::Ready(item) => {
|
|
||||||
// check if body is done
|
|
||||||
if item.is_none() {
|
|
||||||
let _ = self.body.take();
|
|
||||||
}
|
|
||||||
self.flushed = false;
|
|
||||||
self.framed
|
|
||||||
.as_mut()
|
|
||||||
.unwrap()
|
|
||||||
.force_send(h1::Message::Chunk(item))?;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Async::NotReady => body_ready = false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !self.flushed {
|
|
||||||
match self.framed.as_mut().unwrap().poll_complete()? {
|
|
||||||
Async::Ready(_) => {
|
|
||||||
self.flushed = true;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Async::NotReady => return Ok(Async::NotReady),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if self.body.is_none() {
|
|
||||||
return Ok(Async::Ready(self.framed.take().unwrap()));
|
|
||||||
}
|
|
||||||
return Ok(Async::NotReady);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -273,23 +249,24 @@ impl<Io: ConnectionLifetime> PlStream<Io> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<Io: ConnectionLifetime> Stream for PlStream<Io> {
|
impl<Io: ConnectionLifetime> Stream for PlStream<Io> {
|
||||||
type Item = Bytes;
|
type Item = Result<Bytes, PayloadError>;
|
||||||
type Error = PayloadError;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
match self.framed.as_mut().unwrap().poll()? {
|
let this = self.get_mut();
|
||||||
Async::NotReady => Ok(Async::NotReady),
|
|
||||||
Async::Ready(Some(chunk)) => {
|
match this.framed.as_mut().unwrap().next_item(cx)? {
|
||||||
|
Poll::Pending => Poll::Pending,
|
||||||
|
Poll::Ready(Some(chunk)) => {
|
||||||
if let Some(chunk) = chunk {
|
if let Some(chunk) = chunk {
|
||||||
Ok(Async::Ready(Some(chunk)))
|
Poll::Ready(Some(Ok(chunk)))
|
||||||
} else {
|
} else {
|
||||||
let framed = self.framed.take().unwrap();
|
let framed = this.framed.take().unwrap();
|
||||||
let force_close = !framed.get_codec().keepalive();
|
let force_close = !framed.get_codec().keepalive();
|
||||||
release_connection(framed, force_close);
|
release_connection(framed, force_close);
|
||||||
Ok(Async::Ready(None))
|
Poll::Ready(None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Async::Ready(None) => Ok(Async::Ready(None)),
|
Poll::Ready(None) => Poll::Ready(None),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5,7 +5,7 @@ use std::time;
|
|||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite};
|
use actix_codec::{AsyncRead, AsyncWrite};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures::future::{err, Either};
|
use futures::future::{err, poll_fn, Either};
|
||||||
use h2::{client::SendRequest, SendStream};
|
use h2::{client::SendRequest, SendStream};
|
||||||
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING};
|
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING};
|
||||||
use http::{request::Request, HttpTryFrom, Method, Version};
|
use http::{request::Request, HttpTryFrom, Method, Version};
|
||||||
@ -19,15 +19,15 @@ use super::connection::{ConnectionType, IoConnection};
|
|||||||
use super::error::SendRequestError;
|
use super::error::SendRequestError;
|
||||||
use super::pool::Acquired;
|
use super::pool::Acquired;
|
||||||
|
|
||||||
pub(crate) fn send_request<T, B>(
|
pub(crate) async fn send_request<T, B>(
|
||||||
io: SendRequest<Bytes>,
|
mut io: SendRequest<Bytes>,
|
||||||
head: RequestHeadType,
|
head: RequestHeadType,
|
||||||
body: B,
|
body: B,
|
||||||
created: time::Instant,
|
created: time::Instant,
|
||||||
pool: Option<Acquired<T>>,
|
pool: Option<Acquired<T>>,
|
||||||
) -> impl Future<Item = (ResponseHead, Payload), Error = SendRequestError>
|
) -> Result<(ResponseHead, Payload), SendRequestError>
|
||||||
where
|
where
|
||||||
T: AsyncRead + AsyncWrite + 'static,
|
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
B: MessageBody,
|
B: MessageBody,
|
||||||
{
|
{
|
||||||
trace!("Sending client request: {:?} {:?}", head, body.size());
|
trace!("Sending client request: {:?} {:?}", head, body.size());
|
||||||
@ -38,158 +38,138 @@ where
|
|||||||
_ => false,
|
_ => false,
|
||||||
};
|
};
|
||||||
|
|
||||||
io.ready()
|
let mut req = Request::new(());
|
||||||
.map_err(SendRequestError::from)
|
*req.uri_mut() = head.as_ref().uri.clone();
|
||||||
.and_then(move |mut io| {
|
*req.method_mut() = head.as_ref().method.clone();
|
||||||
let mut req = Request::new(());
|
*req.version_mut() = Version::HTTP_2;
|
||||||
*req.uri_mut() = head.as_ref().uri.clone();
|
|
||||||
*req.method_mut() = head.as_ref().method.clone();
|
|
||||||
*req.version_mut() = Version::HTTP_2;
|
|
||||||
|
|
||||||
let mut skip_len = true;
|
let mut skip_len = true;
|
||||||
// let mut has_date = false;
|
// let mut has_date = false;
|
||||||
|
|
||||||
// Content length
|
// Content length
|
||||||
let _ = match length {
|
let _ = match length {
|
||||||
BodySize::None => None,
|
BodySize::None => None,
|
||||||
BodySize::Stream => {
|
BodySize::Stream => {
|
||||||
skip_len = false;
|
skip_len = false;
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
BodySize::Empty => req
|
BodySize::Empty => req
|
||||||
.headers_mut()
|
.headers_mut()
|
||||||
.insert(CONTENT_LENGTH, HeaderValue::from_static("0")),
|
.insert(CONTENT_LENGTH, HeaderValue::from_static("0")),
|
||||||
BodySize::Sized(len) => req.headers_mut().insert(
|
BodySize::Sized(len) => req.headers_mut().insert(
|
||||||
CONTENT_LENGTH,
|
CONTENT_LENGTH,
|
||||||
HeaderValue::try_from(format!("{}", len)).unwrap(),
|
HeaderValue::try_from(format!("{}", len)).unwrap(),
|
||||||
),
|
),
|
||||||
BodySize::Sized64(len) => req.headers_mut().insert(
|
BodySize::Sized64(len) => req.headers_mut().insert(
|
||||||
CONTENT_LENGTH,
|
CONTENT_LENGTH,
|
||||||
HeaderValue::try_from(format!("{}", len)).unwrap(),
|
HeaderValue::try_from(format!("{}", len)).unwrap(),
|
||||||
),
|
),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Extracting extra headers from RequestHeadType. HeaderMap::new() does not allocate.
|
// Extracting extra headers from RequestHeadType. HeaderMap::new() does not allocate.
|
||||||
let (head, extra_headers) = match head {
|
let (head, extra_headers) = match head {
|
||||||
RequestHeadType::Owned(head) => {
|
RequestHeadType::Owned(head) => (RequestHeadType::Owned(head), HeaderMap::new()),
|
||||||
(RequestHeadType::Owned(head), HeaderMap::new())
|
RequestHeadType::Rc(head, extra_headers) => (
|
||||||
}
|
RequestHeadType::Rc(head, None),
|
||||||
RequestHeadType::Rc(head, extra_headers) => (
|
extra_headers.unwrap_or_else(HeaderMap::new),
|
||||||
RequestHeadType::Rc(head, None),
|
),
|
||||||
extra_headers.unwrap_or_else(HeaderMap::new),
|
};
|
||||||
),
|
|
||||||
};
|
|
||||||
|
|
||||||
// merging headers from head and extra headers.
|
// merging headers from head and extra headers.
|
||||||
let headers = head
|
let headers = head
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.headers
|
.headers
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|(name, _)| !extra_headers.contains_key(*name))
|
.filter(|(name, _)| !extra_headers.contains_key(*name))
|
||||||
.chain(extra_headers.iter());
|
.chain(extra_headers.iter());
|
||||||
|
|
||||||
// copy headers
|
// copy headers
|
||||||
for (key, value) in headers {
|
for (key, value) in headers {
|
||||||
match *key {
|
match *key {
|
||||||
CONNECTION | TRANSFER_ENCODING => continue, // http2 specific
|
CONNECTION | TRANSFER_ENCODING => continue, // http2 specific
|
||||||
CONTENT_LENGTH if skip_len => continue,
|
CONTENT_LENGTH if skip_len => continue,
|
||||||
// DATE => has_date = true,
|
// DATE => has_date = true,
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
req.headers_mut().append(key, value.clone());
|
req.headers_mut().append(key, value.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
let res = poll_fn(|cx| io.poll_ready(cx)).await;
|
||||||
|
if let Err(e) = res {
|
||||||
|
release(io, pool, created, e.is_io());
|
||||||
|
return Err(SendRequestError::from(e));
|
||||||
|
}
|
||||||
|
|
||||||
|
let resp = match io.send_request(req, eof) {
|
||||||
|
Ok((fut, send)) => {
|
||||||
|
release(io, pool, created, false);
|
||||||
|
|
||||||
|
if !eof {
|
||||||
|
send_body(body, send).await?;
|
||||||
}
|
}
|
||||||
|
fut.await.map_err(SendRequestError::from)?
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
release(io, pool, created, e.is_io());
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
match io.send_request(req, eof) {
|
let (parts, body) = resp.into_parts();
|
||||||
Ok((res, send)) => {
|
let payload = if head_req { Payload::None } else { body.into() };
|
||||||
release(io, pool, created, false);
|
|
||||||
|
|
||||||
if !eof {
|
let mut head = ResponseHead::new(parts.status);
|
||||||
Either::A(Either::B(
|
head.version = parts.version;
|
||||||
SendBody {
|
head.headers = parts.headers.into();
|
||||||
body,
|
Ok((head, payload))
|
||||||
send,
|
|
||||||
buf: None,
|
|
||||||
}
|
|
||||||
.and_then(move |_| res.map_err(SendRequestError::from)),
|
|
||||||
))
|
|
||||||
} else {
|
|
||||||
Either::B(res.map_err(SendRequestError::from))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
release(io, pool, created, e.is_io());
|
|
||||||
Either::A(Either::A(err(e.into())))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.and_then(move |resp| {
|
|
||||||
let (parts, body) = resp.into_parts();
|
|
||||||
let payload = if head_req { Payload::None } else { body.into() };
|
|
||||||
|
|
||||||
let mut head = ResponseHead::new(parts.status);
|
|
||||||
head.version = parts.version;
|
|
||||||
head.headers = parts.headers.into();
|
|
||||||
Ok((head, payload))
|
|
||||||
})
|
|
||||||
.from_err()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
struct SendBody<B: MessageBody> {
|
async fn send_body<B: MessageBody>(
|
||||||
body: B,
|
mut body: B,
|
||||||
send: SendStream<Bytes>,
|
mut send: SendStream<Bytes>,
|
||||||
buf: Option<Bytes>,
|
) -> Result<(), SendRequestError> {
|
||||||
}
|
let mut buf = None;
|
||||||
|
loop {
|
||||||
impl<B: MessageBody> Future for SendBody<B> {
|
if buf.is_none() {
|
||||||
type Item = ();
|
match poll_fn(|cx| body.poll_next(cx)).await {
|
||||||
type Error = SendRequestError;
|
Some(Ok(b)) => {
|
||||||
|
send.reserve_capacity(b.len());
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
buf = Some(b);
|
||||||
loop {
|
|
||||||
if self.buf.is_none() {
|
|
||||||
match self.body.poll_next() {
|
|
||||||
Ok(Async::Ready(Some(buf))) => {
|
|
||||||
self.send.reserve_capacity(buf.len());
|
|
||||||
self.buf = Some(buf);
|
|
||||||
}
|
|
||||||
Ok(Async::Ready(None)) => {
|
|
||||||
if let Err(e) = self.send.send_data(Bytes::new(), true) {
|
|
||||||
return Err(e.into());
|
|
||||||
}
|
|
||||||
self.send.reserve_capacity(0);
|
|
||||||
return Ok(Async::Ready(()));
|
|
||||||
}
|
|
||||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
|
||||||
Err(e) => return Err(e.into()),
|
|
||||||
}
|
}
|
||||||
}
|
Some(Err(e)) => return Err(e.into()),
|
||||||
|
None => {
|
||||||
match self.send.poll_capacity() {
|
if let Err(e) = send.send_data(Bytes::new(), true) {
|
||||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
|
||||||
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
|
|
||||||
Ok(Async::Ready(Some(cap))) => {
|
|
||||||
let mut buf = self.buf.take().unwrap();
|
|
||||||
let len = buf.len();
|
|
||||||
let bytes = buf.split_to(std::cmp::min(cap, len));
|
|
||||||
|
|
||||||
if let Err(e) = self.send.send_data(bytes, false) {
|
|
||||||
return Err(e.into());
|
return Err(e.into());
|
||||||
} else {
|
|
||||||
if !buf.is_empty() {
|
|
||||||
self.send.reserve_capacity(buf.len());
|
|
||||||
self.buf = Some(buf);
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
send.reserve_capacity(0);
|
||||||
|
return Ok(());
|
||||||
}
|
}
|
||||||
Err(e) => return Err(e.into()),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
match poll_fn(|cx| send.poll_capacity(cx)).await {
|
||||||
|
None => return Ok(()),
|
||||||
|
Some(Ok(cap)) => {
|
||||||
|
let b = buf.as_mut().unwrap();
|
||||||
|
let len = b.len();
|
||||||
|
let bytes = b.split_to(std::cmp::min(cap, len));
|
||||||
|
|
||||||
|
if let Err(e) = send.send_data(bytes, false) {
|
||||||
|
return Err(e.into());
|
||||||
|
} else {
|
||||||
|
if !b.is_empty() {
|
||||||
|
send.reserve_capacity(b.len());
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(Err(e)) => return Err(e.into()),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// release SendRequest object
|
// release SendRequest object
|
||||||
fn release<T: AsyncRead + AsyncWrite + 'static>(
|
fn release<T: AsyncRead + AsyncWrite + Unpin + 'static>(
|
||||||
io: SendRequest<Bytes>,
|
io: SendRequest<Bytes>,
|
||||||
pool: Option<Acquired<T>>,
|
pool: Option<Acquired<T>>,
|
||||||
created: time::Instant,
|
created: time::Instant,
|
||||||
|
@ -9,11 +9,10 @@ use std::time::{Duration, Instant};
|
|||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite};
|
use actix_codec::{AsyncRead, AsyncWrite};
|
||||||
use actix_service::Service;
|
use actix_service::Service;
|
||||||
use actix_utils::oneshot;
|
use actix_utils::{oneshot, task::LocalWaker};
|
||||||
use actix_utils::task::LocalWaker;
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures::future::{err, ok, Either, FutureResult};
|
use futures::future::{err, ok, poll_fn, Either, FutureExt, LocalBoxFuture, Ready};
|
||||||
use h2::client::{handshake, Handshake};
|
use h2::client::{handshake, Connection, SendRequest};
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
use http::uri::Authority;
|
use http::uri::Authority;
|
||||||
use indexmap::IndexSet;
|
use indexmap::IndexSet;
|
||||||
@ -43,17 +42,15 @@ impl From<Authority> for Key {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Connections pool
|
/// Connections pool
|
||||||
pub(crate) struct ConnectionPool<T, Io: AsyncRead + AsyncWrite + 'static>(
|
pub(crate) struct ConnectionPool<T, Io: 'static>(Rc<RefCell<T>>, Rc<RefCell<Inner<Io>>>);
|
||||||
T,
|
|
||||||
Rc<RefCell<Inner<Io>>>,
|
|
||||||
);
|
|
||||||
|
|
||||||
impl<T, Io> ConnectionPool<T, Io>
|
impl<T, Io> ConnectionPool<T, Io>
|
||||||
where
|
where
|
||||||
Io: AsyncRead + AsyncWrite + 'static,
|
Io: AsyncRead + AsyncWrite + 'static,
|
||||||
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
|
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
|
||||||
+ Clone
|
+ Unpin
|
||||||
+ 'static,
|
+ 'static,
|
||||||
|
T::Future: Unpin,
|
||||||
{
|
{
|
||||||
pub(crate) fn new(
|
pub(crate) fn new(
|
||||||
connector: T,
|
connector: T,
|
||||||
@ -63,7 +60,7 @@ where
|
|||||||
limit: usize,
|
limit: usize,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
ConnectionPool(
|
ConnectionPool(
|
||||||
connector,
|
Rc::new(RefCell::new(connector)),
|
||||||
Rc::new(RefCell::new(Inner {
|
Rc::new(RefCell::new(Inner {
|
||||||
conn_lifetime,
|
conn_lifetime,
|
||||||
conn_keep_alive,
|
conn_keep_alive,
|
||||||
@ -73,7 +70,7 @@ where
|
|||||||
waiters: Slab::new(),
|
waiters: Slab::new(),
|
||||||
waiters_queue: IndexSet::new(),
|
waiters_queue: IndexSet::new(),
|
||||||
available: HashMap::new(),
|
available: HashMap::new(),
|
||||||
task: None,
|
waker: LocalWaker::new(),
|
||||||
})),
|
})),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
@ -81,8 +78,7 @@ where
|
|||||||
|
|
||||||
impl<T, Io> Clone for ConnectionPool<T, Io>
|
impl<T, Io> Clone for ConnectionPool<T, Io>
|
||||||
where
|
where
|
||||||
T: Clone,
|
Io: 'static,
|
||||||
Io: AsyncRead + AsyncWrite + 'static,
|
|
||||||
{
|
{
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
ConnectionPool(self.0.clone(), self.1.clone())
|
ConnectionPool(self.0.clone(), self.1.clone())
|
||||||
@ -91,86 +87,118 @@ where
|
|||||||
|
|
||||||
impl<T, Io> Service for ConnectionPool<T, Io>
|
impl<T, Io> Service for ConnectionPool<T, Io>
|
||||||
where
|
where
|
||||||
Io: AsyncRead + AsyncWrite + 'static,
|
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
|
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
|
||||||
+ Clone
|
+ Unpin
|
||||||
+ 'static,
|
+ 'static,
|
||||||
|
T::Future: Unpin,
|
||||||
{
|
{
|
||||||
type Request = Connect;
|
type Request = Connect;
|
||||||
type Response = IoConnection<Io>;
|
type Response = IoConnection<Io>;
|
||||||
type Error = ConnectError;
|
type Error = ConnectError;
|
||||||
type Future = Either<
|
type Future = LocalBoxFuture<'static, Result<IoConnection<Io>, ConnectError>>;
|
||||||
FutureResult<Self::Response, Self::Error>,
|
|
||||||
Either<WaitForConnection<Io>, OpenConnection<T::Future, Io>>,
|
|
||||||
>;
|
|
||||||
|
|
||||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
|
||||||
self.0.poll_ready()
|
self.0.poll_ready(cx)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn call(&mut self, req: Connect) -> Self::Future {
|
fn call(&mut self, req: Connect) -> Self::Future {
|
||||||
let key = if let Some(authority) = req.uri.authority_part() {
|
// start support future
|
||||||
authority.clone().into()
|
tokio_executor::current_thread::spawn(ConnectorPoolSupport {
|
||||||
} else {
|
connector: self.0.clone(),
|
||||||
return Either::A(err(ConnectError::Unresolverd));
|
inner: self.1.clone(),
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut connector = self.0.clone();
|
||||||
|
let inner = self.1.clone();
|
||||||
|
|
||||||
|
let fut = async move {
|
||||||
|
let key = if let Some(authority) = req.uri.authority_part() {
|
||||||
|
authority.clone().into()
|
||||||
|
} else {
|
||||||
|
return Err(ConnectError::Unresolverd);
|
||||||
|
};
|
||||||
|
|
||||||
|
// acquire connection
|
||||||
|
match poll_fn(|cx| Poll::Ready(inner.borrow_mut().acquire(&key, cx))).await {
|
||||||
|
Acquire::Acquired(io, created) => {
|
||||||
|
// use existing connection
|
||||||
|
return Ok(IoConnection::new(
|
||||||
|
io,
|
||||||
|
created,
|
||||||
|
Some(Acquired(key, Some(inner))),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
Acquire::Available => {
|
||||||
|
// open tcp connection
|
||||||
|
let (io, proto) = connector.call(req).await?;
|
||||||
|
|
||||||
|
let guard = OpenGuard::new(key, inner);
|
||||||
|
|
||||||
|
if proto == Protocol::Http1 {
|
||||||
|
Ok(IoConnection::new(
|
||||||
|
ConnectionType::H1(io),
|
||||||
|
Instant::now(),
|
||||||
|
Some(guard.consume()),
|
||||||
|
))
|
||||||
|
} else {
|
||||||
|
let (snd, connection) = handshake(io).await?;
|
||||||
|
tokio_executor::current_thread::spawn(connection.map(|_| ()));
|
||||||
|
Ok(IoConnection::new(
|
||||||
|
ConnectionType::H2(snd),
|
||||||
|
Instant::now(),
|
||||||
|
Some(guard.consume()),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
// connection is not available, wait
|
||||||
|
let (rx, token) = inner.borrow_mut().wait_for(req);
|
||||||
|
|
||||||
|
let guard = WaiterGuard::new(key, token, inner);
|
||||||
|
let res = match rx.await {
|
||||||
|
Err(_) => Err(ConnectError::Disconnected),
|
||||||
|
Ok(res) => res,
|
||||||
|
};
|
||||||
|
guard.consume();
|
||||||
|
res
|
||||||
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// acquire connection
|
fut.boxed_local()
|
||||||
match self.1.as_ref().borrow_mut().acquire(&key) {
|
|
||||||
Acquire::Acquired(io, created) => {
|
|
||||||
// use existing connection
|
|
||||||
return Either::A(ok(IoConnection::new(
|
|
||||||
io,
|
|
||||||
created,
|
|
||||||
Some(Acquired(key, Some(self.1.clone()))),
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
Acquire::Available => {
|
|
||||||
// open new connection
|
|
||||||
return Either::B(Either::B(OpenConnection::new(
|
|
||||||
key,
|
|
||||||
self.1.clone(),
|
|
||||||
self.0.call(req),
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
_ => (),
|
|
||||||
}
|
|
||||||
|
|
||||||
// connection is not available, wait
|
|
||||||
let (rx, token, support) = self.1.as_ref().borrow_mut().wait_for(req);
|
|
||||||
|
|
||||||
// start support future
|
|
||||||
if !support {
|
|
||||||
self.1.as_ref().borrow_mut().task = Some(AtomicTask::new());
|
|
||||||
tokio_executor::current_thread::spawn(ConnectorPoolSupport {
|
|
||||||
connector: self.0.clone(),
|
|
||||||
inner: self.1.clone(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
Either::B(Either::A(WaitForConnection {
|
|
||||||
rx,
|
|
||||||
key,
|
|
||||||
token,
|
|
||||||
inner: Some(self.1.clone()),
|
|
||||||
}))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[doc(hidden)]
|
struct WaiterGuard<Io>
|
||||||
pub struct WaitForConnection<Io>
|
|
||||||
where
|
where
|
||||||
Io: AsyncRead + AsyncWrite + 'static,
|
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
{
|
{
|
||||||
key: Key,
|
key: Key,
|
||||||
token: usize,
|
token: usize,
|
||||||
rx: oneshot::Receiver<Result<IoConnection<Io>, ConnectError>>,
|
|
||||||
inner: Option<Rc<RefCell<Inner<Io>>>>,
|
inner: Option<Rc<RefCell<Inner<Io>>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Io> Drop for WaitForConnection<Io>
|
impl<Io> WaiterGuard<Io>
|
||||||
where
|
where
|
||||||
Io: AsyncRead + AsyncWrite + 'static,
|
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
|
{
|
||||||
|
fn new(key: Key, token: usize, inner: Rc<RefCell<Inner<Io>>>) -> Self {
|
||||||
|
Self {
|
||||||
|
key,
|
||||||
|
token,
|
||||||
|
inner: Some(inner),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn consume(mut self) {
|
||||||
|
let _ = self.inner.take();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Io> Drop for WaiterGuard<Io>
|
||||||
|
where
|
||||||
|
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
{
|
{
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if let Some(i) = self.inner.take() {
|
if let Some(i) = self.inner.take() {
|
||||||
@ -181,113 +209,43 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Io> Future for WaitForConnection<Io>
|
struct OpenGuard<Io>
|
||||||
where
|
where
|
||||||
Io: AsyncRead + AsyncWrite,
|
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
{
|
{
|
||||||
type Item = IoConnection<Io>;
|
|
||||||
type Error = ConnectError;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
|
||||||
match self.rx.poll() {
|
|
||||||
Ok(Async::Ready(item)) => match item {
|
|
||||||
Err(err) => Err(err),
|
|
||||||
Ok(conn) => {
|
|
||||||
let _ = self.inner.take();
|
|
||||||
Ok(Async::Ready(conn))
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
|
||||||
Err(_) => {
|
|
||||||
let _ = self.inner.take();
|
|
||||||
Err(ConnectError::Disconnected)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[doc(hidden)]
|
|
||||||
pub struct OpenConnection<F, Io>
|
|
||||||
where
|
|
||||||
Io: AsyncRead + AsyncWrite + 'static,
|
|
||||||
{
|
|
||||||
fut: F,
|
|
||||||
key: Key,
|
key: Key,
|
||||||
h2: Option<Handshake<Io, Bytes>>,
|
|
||||||
inner: Option<Rc<RefCell<Inner<Io>>>>,
|
inner: Option<Rc<RefCell<Inner<Io>>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F, Io> OpenConnection<F, Io>
|
impl<Io> OpenGuard<Io>
|
||||||
where
|
where
|
||||||
F: Future<Item = (Io, Protocol), Error = ConnectError>,
|
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
Io: AsyncRead + AsyncWrite + 'static,
|
|
||||||
{
|
{
|
||||||
fn new(key: Key, inner: Rc<RefCell<Inner<Io>>>, fut: F) -> Self {
|
fn new(key: Key, inner: Rc<RefCell<Inner<Io>>>) -> Self {
|
||||||
OpenConnection {
|
Self {
|
||||||
key,
|
key,
|
||||||
fut,
|
|
||||||
inner: Some(inner),
|
inner: Some(inner),
|
||||||
h2: None,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn consume(mut self) -> Acquired<Io> {
|
||||||
|
Acquired(self.key.clone(), self.inner.take())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F, Io> Drop for OpenConnection<F, Io>
|
impl<Io> Drop for OpenGuard<Io>
|
||||||
where
|
where
|
||||||
Io: AsyncRead + AsyncWrite + 'static,
|
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
{
|
{
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if let Some(inner) = self.inner.take() {
|
if let Some(i) = self.inner.take() {
|
||||||
let mut inner = inner.as_ref().borrow_mut();
|
let mut inner = i.as_ref().borrow_mut();
|
||||||
inner.release();
|
inner.release();
|
||||||
inner.check_availibility();
|
inner.check_availibility();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F, Io> Future for OpenConnection<F, Io>
|
|
||||||
where
|
|
||||||
F: Future<Item = (Io, Protocol), Error = ConnectError>,
|
|
||||||
Io: AsyncRead + AsyncWrite,
|
|
||||||
{
|
|
||||||
type Item = IoConnection<Io>;
|
|
||||||
type Error = ConnectError;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
|
||||||
if let Some(ref mut h2) = self.h2 {
|
|
||||||
return match h2.poll() {
|
|
||||||
Ok(Async::Ready((snd, connection))) => {
|
|
||||||
tokio_executor::current_thread::spawn(connection.map_err(|_| ()));
|
|
||||||
Ok(Async::Ready(IoConnection::new(
|
|
||||||
ConnectionType::H2(snd),
|
|
||||||
Instant::now(),
|
|
||||||
Some(Acquired(self.key.clone(), self.inner.take())),
|
|
||||||
)))
|
|
||||||
}
|
|
||||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
|
||||||
Err(e) => Err(e.into()),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
match self.fut.poll() {
|
|
||||||
Err(err) => Err(err),
|
|
||||||
Ok(Async::Ready((io, proto))) => {
|
|
||||||
if proto == Protocol::Http1 {
|
|
||||||
Ok(Async::Ready(IoConnection::new(
|
|
||||||
ConnectionType::H1(io),
|
|
||||||
Instant::now(),
|
|
||||||
Some(Acquired(self.key.clone(), self.inner.take())),
|
|
||||||
)))
|
|
||||||
} else {
|
|
||||||
self.h2 = Some(handshake(io));
|
|
||||||
self.poll()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
enum Acquire<T> {
|
enum Acquire<T> {
|
||||||
Acquired(ConnectionType<T>, Instant),
|
Acquired(ConnectionType<T>, Instant),
|
||||||
Available,
|
Available,
|
||||||
@ -314,7 +272,7 @@ pub(crate) struct Inner<Io> {
|
|||||||
)>,
|
)>,
|
||||||
>,
|
>,
|
||||||
waiters_queue: IndexSet<(Key, usize)>,
|
waiters_queue: IndexSet<(Key, usize)>,
|
||||||
task: Option<AtomicTask>,
|
waker: LocalWaker,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Io> Inner<Io> {
|
impl<Io> Inner<Io> {
|
||||||
@ -334,7 +292,7 @@ impl<Io> Inner<Io> {
|
|||||||
|
|
||||||
impl<Io> Inner<Io>
|
impl<Io> Inner<Io>
|
||||||
where
|
where
|
||||||
Io: AsyncRead + AsyncWrite + 'static,
|
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
{
|
{
|
||||||
/// connection is not available, wait
|
/// connection is not available, wait
|
||||||
fn wait_for(
|
fn wait_for(
|
||||||
@ -343,7 +301,6 @@ where
|
|||||||
) -> (
|
) -> (
|
||||||
oneshot::Receiver<Result<IoConnection<Io>, ConnectError>>,
|
oneshot::Receiver<Result<IoConnection<Io>, ConnectError>>,
|
||||||
usize,
|
usize,
|
||||||
bool,
|
|
||||||
) {
|
) {
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
@ -353,10 +310,10 @@ where
|
|||||||
entry.insert(Some((connect, tx)));
|
entry.insert(Some((connect, tx)));
|
||||||
assert!(self.waiters_queue.insert((key, token)));
|
assert!(self.waiters_queue.insert((key, token)));
|
||||||
|
|
||||||
(rx, token, self.task.is_some())
|
(rx, token)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn acquire(&mut self, key: &Key) -> Acquire<Io> {
|
fn acquire(&mut self, key: &Key, cx: &mut Context) -> Acquire<Io> {
|
||||||
// check limits
|
// check limits
|
||||||
if self.limit > 0 && self.acquired >= self.limit {
|
if self.limit > 0 && self.acquired >= self.limit {
|
||||||
return Acquire::NotAvailable;
|
return Acquire::NotAvailable;
|
||||||
@ -384,9 +341,9 @@ where
|
|||||||
let mut io = conn.io;
|
let mut io = conn.io;
|
||||||
let mut buf = [0; 2];
|
let mut buf = [0; 2];
|
||||||
if let ConnectionType::H1(ref mut s) = io {
|
if let ConnectionType::H1(ref mut s) = io {
|
||||||
match s.read(&mut buf) {
|
match Pin::new(s).poll_read(cx, &mut buf) {
|
||||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => (),
|
Poll::Pending => (),
|
||||||
Ok(n) if n > 0 => {
|
Poll::Ready(Ok(n)) if n > 0 => {
|
||||||
if let Some(timeout) = self.disconnect_timeout {
|
if let Some(timeout) = self.disconnect_timeout {
|
||||||
if let ConnectionType::H1(io) = io {
|
if let ConnectionType::H1(io) = io {
|
||||||
tokio_executor::current_thread::spawn(
|
tokio_executor::current_thread::spawn(
|
||||||
@ -396,7 +353,7 @@ where
|
|||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
Ok(_) | Err(_) => continue,
|
_ => continue,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return Acquire::Acquired(io, conn.created);
|
return Acquire::Acquired(io, conn.created);
|
||||||
@ -431,9 +388,7 @@ where
|
|||||||
|
|
||||||
fn check_availibility(&self) {
|
fn check_availibility(&self) {
|
||||||
if !self.waiters_queue.is_empty() && self.acquired < self.limit {
|
if !self.waiters_queue.is_empty() && self.acquired < self.limit {
|
||||||
if let Some(t) = self.task.as_ref() {
|
self.waker.wake();
|
||||||
t.notify()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -457,17 +412,16 @@ where
|
|||||||
|
|
||||||
impl<T> Future for CloseConnection<T>
|
impl<T> Future for CloseConnection<T>
|
||||||
where
|
where
|
||||||
T: AsyncWrite,
|
T: AsyncWrite + Unpin,
|
||||||
{
|
{
|
||||||
type Item = ();
|
type Output = ();
|
||||||
type Error = ();
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<(), ()> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
|
||||||
match self.timeout.poll() {
|
match Pin::new(&mut self.timeout).poll(cx) {
|
||||||
Ok(Async::Ready(_)) | Err(_) => Ok(Async::Ready(())),
|
Poll::Ready(_) => Poll::Ready(()),
|
||||||
Ok(Async::NotReady) => match self.io.shutdown() {
|
Poll::Pending => match Pin::new(&mut self.io).poll_shutdown(cx) {
|
||||||
Ok(Async::Ready(_)) | Err(_) => Ok(Async::Ready(())),
|
Poll::Ready(_) => Poll::Ready(()),
|
||||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
Poll::Pending => Poll::Pending,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -483,16 +437,18 @@ where
|
|||||||
|
|
||||||
impl<T, Io> Future for ConnectorPoolSupport<T, Io>
|
impl<T, Io> Future for ConnectorPoolSupport<T, Io>
|
||||||
where
|
where
|
||||||
Io: AsyncRead + AsyncWrite + 'static,
|
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
|
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
|
||||||
T::Future: 'static,
|
+ Unpin,
|
||||||
|
T::Future: Unpin + 'static,
|
||||||
{
|
{
|
||||||
type Item = ();
|
type Output = ();
|
||||||
type Error = ();
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||||
let mut inner = self.inner.as_ref().borrow_mut();
|
let this = self.get_mut();
|
||||||
inner.task.as_ref().unwrap().register();
|
|
||||||
|
let mut inner = this.inner.as_ref().borrow_mut();
|
||||||
|
inner.waker.register(cx.waker());
|
||||||
|
|
||||||
// check waiters
|
// check waiters
|
||||||
loop {
|
loop {
|
||||||
@ -507,14 +463,14 @@ where
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
match inner.acquire(&key) {
|
match inner.acquire(&key, cx) {
|
||||||
Acquire::NotAvailable => break,
|
Acquire::NotAvailable => break,
|
||||||
Acquire::Acquired(io, created) => {
|
Acquire::Acquired(io, created) => {
|
||||||
let tx = inner.waiters.get_mut(token).unwrap().take().unwrap().1;
|
let tx = inner.waiters.get_mut(token).unwrap().take().unwrap().1;
|
||||||
if let Err(conn) = tx.send(Ok(IoConnection::new(
|
if let Err(conn) = tx.send(Ok(IoConnection::new(
|
||||||
io,
|
io,
|
||||||
created,
|
created,
|
||||||
Some(Acquired(key.clone(), Some(self.inner.clone()))),
|
Some(Acquired(key.clone(), Some(this.inner.clone()))),
|
||||||
))) {
|
))) {
|
||||||
let (io, created) = conn.unwrap().into_inner();
|
let (io, created) = conn.unwrap().into_inner();
|
||||||
inner.release_conn(&key, io, created);
|
inner.release_conn(&key, io, created);
|
||||||
@ -526,33 +482,38 @@ where
|
|||||||
OpenWaitingConnection::spawn(
|
OpenWaitingConnection::spawn(
|
||||||
key.clone(),
|
key.clone(),
|
||||||
tx,
|
tx,
|
||||||
self.inner.clone(),
|
this.inner.clone(),
|
||||||
self.connector.call(connect),
|
this.connector.call(connect),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let _ = inner.waiters_queue.swap_remove_index(0);
|
let _ = inner.waiters_queue.swap_remove_index(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Async::NotReady)
|
Poll::Pending
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct OpenWaitingConnection<F, Io>
|
struct OpenWaitingConnection<F, Io>
|
||||||
where
|
where
|
||||||
Io: AsyncRead + AsyncWrite + 'static,
|
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
{
|
{
|
||||||
fut: F,
|
fut: F,
|
||||||
key: Key,
|
key: Key,
|
||||||
h2: Option<Handshake<Io, Bytes>>,
|
h2: Option<
|
||||||
|
LocalBoxFuture<
|
||||||
|
'static,
|
||||||
|
Result<(SendRequest<Bytes>, Connection<Io, Bytes>), h2::Error>,
|
||||||
|
>,
|
||||||
|
>,
|
||||||
rx: Option<oneshot::Sender<Result<IoConnection<Io>, ConnectError>>>,
|
rx: Option<oneshot::Sender<Result<IoConnection<Io>, ConnectError>>>,
|
||||||
inner: Option<Rc<RefCell<Inner<Io>>>>,
|
inner: Option<Rc<RefCell<Inner<Io>>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F, Io> OpenWaitingConnection<F, Io>
|
impl<F, Io> OpenWaitingConnection<F, Io>
|
||||||
where
|
where
|
||||||
F: Future<Item = (Io, Protocol), Error = ConnectError> + 'static,
|
F: Future<Output = Result<(Io, Protocol), ConnectError>> + Unpin + 'static,
|
||||||
Io: AsyncRead + AsyncWrite + 'static,
|
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
{
|
{
|
||||||
fn spawn(
|
fn spawn(
|
||||||
key: Key,
|
key: Key,
|
||||||
@ -572,7 +533,7 @@ where
|
|||||||
|
|
||||||
impl<F, Io> Drop for OpenWaitingConnection<F, Io>
|
impl<F, Io> Drop for OpenWaitingConnection<F, Io>
|
||||||
where
|
where
|
||||||
Io: AsyncRead + AsyncWrite + 'static,
|
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
{
|
{
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if let Some(inner) = self.inner.take() {
|
if let Some(inner) = self.inner.take() {
|
||||||
@ -585,59 +546,60 @@ where
|
|||||||
|
|
||||||
impl<F, Io> Future for OpenWaitingConnection<F, Io>
|
impl<F, Io> Future for OpenWaitingConnection<F, Io>
|
||||||
where
|
where
|
||||||
F: Future<Item = (Io, Protocol), Error = ConnectError>,
|
F: Future<Output = Result<(Io, Protocol), ConnectError>> + Unpin,
|
||||||
Io: AsyncRead + AsyncWrite,
|
Io: AsyncRead + AsyncWrite + Unpin,
|
||||||
{
|
{
|
||||||
type Item = ();
|
type Output = ();
|
||||||
type Error = ();
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||||
if let Some(ref mut h2) = self.h2 {
|
let this = self.get_mut();
|
||||||
return match h2.poll() {
|
|
||||||
Ok(Async::Ready((snd, connection))) => {
|
if let Some(ref mut h2) = this.h2 {
|
||||||
tokio_executor::current_thread::spawn(connection.map_err(|_| ()));
|
return match Pin::new(h2).poll(cx) {
|
||||||
let rx = self.rx.take().unwrap();
|
Poll::Ready(Ok((snd, connection))) => {
|
||||||
|
tokio_executor::current_thread::spawn(connection.map(|_| ()));
|
||||||
|
let rx = this.rx.take().unwrap();
|
||||||
let _ = rx.send(Ok(IoConnection::new(
|
let _ = rx.send(Ok(IoConnection::new(
|
||||||
ConnectionType::H2(snd),
|
ConnectionType::H2(snd),
|
||||||
Instant::now(),
|
Instant::now(),
|
||||||
Some(Acquired(self.key.clone(), self.inner.take())),
|
Some(Acquired(this.key.clone(), this.inner.take())),
|
||||||
)));
|
)));
|
||||||
Ok(Async::Ready(()))
|
Poll::Ready(())
|
||||||
}
|
}
|
||||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
Poll::Pending => Poll::Pending,
|
||||||
Err(err) => {
|
Poll::Ready(Err(err)) => {
|
||||||
let _ = self.inner.take();
|
let _ = this.inner.take();
|
||||||
if let Some(rx) = self.rx.take() {
|
if let Some(rx) = this.rx.take() {
|
||||||
let _ = rx.send(Err(ConnectError::H2(err)));
|
let _ = rx.send(Err(ConnectError::H2(err)));
|
||||||
}
|
}
|
||||||
Err(())
|
Poll::Ready(())
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
match self.fut.poll() {
|
match Pin::new(&mut this.fut).poll(cx) {
|
||||||
Err(err) => {
|
Poll::Ready(Err(err)) => {
|
||||||
let _ = self.inner.take();
|
let _ = this.inner.take();
|
||||||
if let Some(rx) = self.rx.take() {
|
if let Some(rx) = this.rx.take() {
|
||||||
let _ = rx.send(Err(err));
|
let _ = rx.send(Err(err));
|
||||||
}
|
}
|
||||||
Err(())
|
Poll::Ready(())
|
||||||
}
|
}
|
||||||
Ok(Async::Ready((io, proto))) => {
|
Poll::Ready(Ok((io, proto))) => {
|
||||||
if proto == Protocol::Http1 {
|
if proto == Protocol::Http1 {
|
||||||
let rx = self.rx.take().unwrap();
|
let rx = this.rx.take().unwrap();
|
||||||
let _ = rx.send(Ok(IoConnection::new(
|
let _ = rx.send(Ok(IoConnection::new(
|
||||||
ConnectionType::H1(io),
|
ConnectionType::H1(io),
|
||||||
Instant::now(),
|
Instant::now(),
|
||||||
Some(Acquired(self.key.clone(), self.inner.take())),
|
Some(Acquired(this.key.clone(), this.inner.take())),
|
||||||
)));
|
)));
|
||||||
Ok(Async::Ready(()))
|
Poll::Ready(())
|
||||||
} else {
|
} else {
|
||||||
self.h2 = Some(handshake(io));
|
this.h2 = Some(handshake(io).boxed_local());
|
||||||
self.poll()
|
Pin::new(this).poll(cx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
Poll::Pending => Poll::Pending,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -646,7 +608,7 @@ pub(crate) struct Acquired<T>(Key, Option<Rc<RefCell<Inner<T>>>>);
|
|||||||
|
|
||||||
impl<T> Acquired<T>
|
impl<T> Acquired<T>
|
||||||
where
|
where
|
||||||
T: AsyncRead + AsyncWrite + 'static,
|
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
{
|
{
|
||||||
pub(crate) fn close(&mut self, conn: IoConnection<T>) {
|
pub(crate) fn close(&mut self, conn: IoConnection<T>) {
|
||||||
if let Some(inner) = self.1.take() {
|
if let Some(inner) = self.1.take() {
|
||||||
|
@ -55,7 +55,7 @@ where
|
|||||||
if item.is_none() {
|
if item.is_none() {
|
||||||
let _ = this.body.take();
|
let _ = this.body.take();
|
||||||
}
|
}
|
||||||
framed.force_send(Message::Chunk(item))?;
|
framed.write(Message::Chunk(item))?;
|
||||||
}
|
}
|
||||||
Poll::Pending => body_ready = false,
|
Poll::Pending => body_ready = false,
|
||||||
}
|
}
|
||||||
@ -78,7 +78,7 @@ where
|
|||||||
|
|
||||||
// send response
|
// send response
|
||||||
if let Some(res) = this.res.take() {
|
if let Some(res) = this.res.take() {
|
||||||
framed.force_send(res)?;
|
framed.write(res)?;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -13,7 +13,7 @@ extern crate log;
|
|||||||
|
|
||||||
pub mod body;
|
pub mod body;
|
||||||
mod builder;
|
mod builder;
|
||||||
// pub mod client;
|
pub mod client;
|
||||||
mod cloneable;
|
mod cloneable;
|
||||||
mod config;
|
mod config;
|
||||||
pub mod encoding;
|
pub mod encoding;
|
||||||
@ -32,8 +32,8 @@ pub mod cookie;
|
|||||||
pub mod error;
|
pub mod error;
|
||||||
pub mod h1;
|
pub mod h1;
|
||||||
pub mod h2;
|
pub mod h2;
|
||||||
// pub mod test;
|
pub mod test;
|
||||||
// pub mod ws;
|
pub mod ws;
|
||||||
|
|
||||||
pub use self::builder::HttpServiceBuilder;
|
pub use self::builder::HttpServiceBuilder;
|
||||||
pub use self::config::{KeepAlive, ServiceConfig};
|
pub use self::config::{KeepAlive, ServiceConfig};
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
//! Test Various helpers for Actix applications to use during testing.
|
//! Test Various helpers for Actix applications to use during testing.
|
||||||
use std::fmt::Write as FmtWrite;
|
use std::fmt::Write as FmtWrite;
|
||||||
use std::io;
|
use std::io::{self, Read, Write};
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
@ -245,16 +245,33 @@ impl io::Write for TestBuffer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// impl AsyncRead for TestBuffer {}
|
impl AsyncRead for TestBuffer {
|
||||||
|
fn poll_read(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
_: &mut Context<'_>,
|
||||||
|
buf: &mut [u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
Poll::Ready(self.get_mut().read(buf))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// impl AsyncWrite for TestBuffer {
|
impl AsyncWrite for TestBuffer {
|
||||||
// fn shutdown(&mut self) -> Poll<(), io::Error> {
|
fn poll_write(
|
||||||
// Ok(Async::Ready(()))
|
self: Pin<&mut Self>,
|
||||||
// }
|
_: &mut Context<'_>,
|
||||||
// fn write_buf<B: Buf>(&mut self, _: &mut B) -> Poll<usize, io::Error> {
|
buf: &[u8],
|
||||||
// Ok(Async::NotReady)
|
) -> Poll<io::Result<usize>> {
|
||||||
// }
|
Poll::Ready(self.get_mut().write(buf))
|
||||||
// }
|
}
|
||||||
|
|
||||||
|
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl IoStream for TestBuffer {
|
impl IoStream for TestBuffer {
|
||||||
fn set_nodelay(&mut self, _nodelay: bool) -> io::Result<()> {
|
fn set_nodelay(&mut self, _nodelay: bool) -> io::Result<()> {
|
||||||
|
@ -1,24 +1,27 @@
|
|||||||
|
use std::future::Future;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite, Framed};
|
use actix_codec::{AsyncRead, AsyncWrite, Framed};
|
||||||
use actix_service::{IntoService, Service};
|
use actix_service::{IntoService, Service};
|
||||||
use actix_utils::framed::{FramedTransport, FramedTransportError};
|
use actix_utils::framed::{FramedTransport, FramedTransportError};
|
||||||
use futures::{Future, Poll};
|
|
||||||
|
|
||||||
use super::{Codec, Frame, Message};
|
use super::{Codec, Frame, Message};
|
||||||
|
|
||||||
pub struct Transport<S, T>
|
pub struct Transport<S, T>
|
||||||
where
|
where
|
||||||
S: Service<Request = Frame, Response = Message> + 'static,
|
S: Service<Request = Frame, Response = Message> + 'static,
|
||||||
T: AsyncRead + AsyncWrite,
|
T: AsyncRead + AsyncWrite + Unpin,
|
||||||
{
|
{
|
||||||
inner: FramedTransport<S, T, Codec>,
|
inner: FramedTransport<S, T, Codec>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S, T> Transport<S, T>
|
impl<S, T> Transport<S, T>
|
||||||
where
|
where
|
||||||
T: AsyncRead + AsyncWrite,
|
T: AsyncRead + AsyncWrite + Unpin,
|
||||||
S: Service<Request = Frame, Response = Message>,
|
S: Service<Request = Frame, Response = Message> + Unpin,
|
||||||
S::Future: 'static,
|
S::Future: 'static,
|
||||||
S::Error: 'static,
|
S::Error: Unpin + 'static,
|
||||||
{
|
{
|
||||||
pub fn new<F: IntoService<S>>(io: T, service: F) -> Self {
|
pub fn new<F: IntoService<S>>(io: T, service: F) -> Self {
|
||||||
Transport {
|
Transport {
|
||||||
@ -35,15 +38,14 @@ where
|
|||||||
|
|
||||||
impl<S, T> Future for Transport<S, T>
|
impl<S, T> Future for Transport<S, T>
|
||||||
where
|
where
|
||||||
T: AsyncRead + AsyncWrite,
|
T: AsyncRead + AsyncWrite + Unpin,
|
||||||
S: Service<Request = Frame, Response = Message>,
|
S: Service<Request = Frame, Response = Message> + Unpin,
|
||||||
S::Future: 'static,
|
S::Future: 'static,
|
||||||
S::Error: 'static,
|
S::Error: Unpin + 'static,
|
||||||
{
|
{
|
||||||
type Item = ();
|
type Output = Result<(), FramedTransportError<S::Error, Codec>>;
|
||||||
type Error = FramedTransportError<S::Error, Codec>;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||||
self.inner.poll()
|
Pin::new(&mut self.inner).poll(cx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user