1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-23 23:34:35 +01:00

297 lines
8.1 KiB
Rust
Raw Normal View History

2019-11-18 18:42:27 +06:00
use std::pin::Pin;
use std::task::{Context, Poll};
2019-12-05 23:35:43 +06:00
use std::{fmt, io, mem, time};
2018-11-11 23:12:54 -08:00
2019-03-27 18:53:19 -07:00
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use bytes::{Buf, Bytes};
2019-11-18 18:42:27 +06:00
use futures::future::{err, Either, Future, FutureExt, LocalBoxFuture, Ready};
2019-01-28 20:41:09 -08:00
use h2::client::SendRequest;
2019-11-19 18:54:19 +06:00
use pin_project::{pin_project, project};
2018-11-11 23:12:54 -08:00
2019-01-28 20:41:09 -08:00
use crate::body::MessageBody;
2019-03-27 18:53:19 -07:00
use crate::h1::ClientCodec;
use crate::message::{RequestHeadType, ResponseHead};
use crate::payload::Payload;
2019-01-28 20:41:09 -08:00
use super::error::SendRequestError;
2019-04-08 11:09:57 -07:00
use super::pool::{Acquired, Protocol};
2019-01-28 20:41:09 -08:00
use super::{h1proto, h2proto};
pub(crate) enum ConnectionType<Io> {
H1(Io),
H2(SendRequest<Bytes>),
}
2019-01-29 10:34:27 -08:00
pub trait Connection {
type Io: AsyncRead + AsyncWrite + Unpin;
2019-11-18 18:42:27 +06:00
type Future: Future<Output = Result<(ResponseHead, Payload), SendRequestError>>;
2019-01-28 20:41:09 -08:00
2019-04-08 11:09:57 -07:00
fn protocol(&self) -> Protocol;
2019-03-25 21:52:45 -07:00
/// Send request and body
fn send_request<B: MessageBody + 'static, H: Into<RequestHeadType>>(
2019-01-28 20:41:09 -08:00
self,
head: H,
2019-01-28 20:41:09 -08:00
body: B,
) -> Self::Future;
2019-03-27 18:53:19 -07:00
type TunnelFuture: Future<
2019-11-18 18:42:27 +06:00
Output = Result<(ResponseHead, Framed<Self::Io, ClientCodec>), SendRequestError>,
2019-03-27 18:53:19 -07:00
>;
/// Send request, returns Response and Framed
fn open_tunnel<H: Into<RequestHeadType>>(self, head: H) -> Self::TunnelFuture;
2019-01-28 20:41:09 -08:00
}
2018-11-11 23:12:54 -08:00
2019-11-19 18:54:19 +06:00
pub(crate) trait ConnectionLifetime: AsyncRead + AsyncWrite + 'static {
2018-11-15 11:10:23 -08:00
/// Close connection
fn close(&mut self);
/// Release connection to the connection pool
fn release(&mut self);
}
#[doc(hidden)]
2018-11-11 23:12:54 -08:00
/// HTTP client connection
2018-11-15 11:10:23 -08:00
pub struct IoConnection<T> {
2019-01-28 20:41:09 -08:00
io: Option<ConnectionType<T>>,
2018-11-11 23:12:54 -08:00
created: time::Instant,
pool: Option<Acquired<T>>,
}
2018-11-15 11:10:23 -08:00
impl<T> fmt::Debug for IoConnection<T>
2018-11-11 23:12:54 -08:00
where
2018-11-13 22:53:30 -08:00
T: fmt::Debug,
2018-11-11 23:12:54 -08:00
{
2019-12-08 00:46:51 +06:00
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2019-01-28 20:41:09 -08:00
match self.io {
Some(ConnectionType::H1(ref io)) => write!(f, "H1Connection({:?})", io),
Some(ConnectionType::H2(_)) => write!(f, "H2Connection"),
None => write!(f, "Connection(Empty)"),
}
2018-11-11 23:12:54 -08:00
}
}
2019-11-19 18:54:19 +06:00
impl<T: AsyncRead + AsyncWrite + Unpin> IoConnection<T> {
2019-01-28 20:41:09 -08:00
pub(crate) fn new(
io: ConnectionType<T>,
created: time::Instant,
pool: Option<Acquired<T>>,
) -> Self {
2018-11-15 11:10:23 -08:00
IoConnection {
2019-01-28 20:41:09 -08:00
pool,
2018-11-11 23:12:54 -08:00
created,
2018-11-15 11:10:23 -08:00
io: Some(io),
2018-11-11 23:12:54 -08:00
}
}
2019-01-28 20:41:09 -08:00
pub(crate) fn into_inner(self) -> (ConnectionType<T>, time::Instant) {
2018-11-15 11:10:23 -08:00
(self.io.unwrap(), self.created)
}
}
2019-01-29 10:34:27 -08:00
impl<T> Connection for IoConnection<T>
2019-01-28 20:41:09 -08:00
where
2019-11-18 18:42:27 +06:00
T: AsyncRead + AsyncWrite + Unpin + 'static,
2019-01-28 20:41:09 -08:00
{
2019-03-27 18:53:19 -07:00
type Io = T;
type Future =
2019-11-18 18:42:27 +06:00
LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>;
2019-01-28 20:41:09 -08:00
2019-04-08 11:09:57 -07:00
fn protocol(&self) -> Protocol {
match self.io {
Some(ConnectionType::H1(_)) => Protocol::Http1,
Some(ConnectionType::H2(_)) => Protocol::Http2,
None => Protocol::Http1,
}
}
fn send_request<B: MessageBody + 'static, H: Into<RequestHeadType>>(
2019-01-28 20:41:09 -08:00
mut self,
head: H,
2019-01-28 20:41:09 -08:00
body: B,
) -> Self::Future {
match self.io.take().unwrap() {
2019-11-18 18:42:27 +06:00
ConnectionType::H1(io) => {
h1proto::send_request(io, head.into(), body, self.created, self.pool)
.boxed_local()
}
ConnectionType::H2(io) => {
h2proto::send_request(io, head.into(), body, self.created, self.pool)
.boxed_local()
}
2018-11-11 23:12:54 -08:00
}
}
2019-03-27 18:53:19 -07:00
type TunnelFuture = Either<
2019-11-18 18:42:27 +06:00
LocalBoxFuture<
'static,
Result<(ResponseHead, Framed<Self::Io, ClientCodec>), SendRequestError>,
2019-03-27 18:53:19 -07:00
>,
2019-11-18 18:42:27 +06:00
Ready<Result<(ResponseHead, Framed<Self::Io, ClientCodec>), SendRequestError>>,
2019-03-27 18:53:19 -07:00
>;
/// Send request, returns Response and Framed
fn open_tunnel<H: Into<RequestHeadType>>(mut self, head: H) -> Self::TunnelFuture {
2019-03-27 18:53:19 -07:00
match self.io.take().unwrap() {
ConnectionType::H1(io) => {
2019-11-18 18:42:27 +06:00
Either::Left(h1proto::open_tunnel(io, head.into()).boxed_local())
2019-03-27 18:53:19 -07:00
}
ConnectionType::H2(io) => {
if let Some(mut pool) = self.pool.take() {
pool.release(IoConnection::new(
ConnectionType::H2(io),
self.created,
None,
));
}
2019-11-18 18:42:27 +06:00
Either::Right(err(SendRequestError::TunnelNotSupported))
2019-03-27 18:53:19 -07:00
}
}
}
2018-11-11 23:12:54 -08:00
}
2019-01-28 20:41:09 -08:00
#[allow(dead_code)]
pub(crate) enum EitherConnection<A, B> {
A(IoConnection<A>),
B(IoConnection<B>),
2018-11-11 23:12:54 -08:00
}
2019-01-29 10:34:27 -08:00
impl<A, B> Connection for EitherConnection<A, B>
2019-01-28 20:41:09 -08:00
where
2019-11-18 18:42:27 +06:00
A: AsyncRead + AsyncWrite + Unpin + 'static,
B: AsyncRead + AsyncWrite + Unpin + 'static,
2019-01-28 20:41:09 -08:00
{
2019-03-27 18:53:19 -07:00
type Io = EitherIo<A, B>;
type Future =
2019-11-18 18:42:27 +06:00
LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>;
2019-01-28 20:41:09 -08:00
2019-04-08 11:09:57 -07:00
fn protocol(&self) -> Protocol {
match self {
EitherConnection::A(con) => con.protocol(),
EitherConnection::B(con) => con.protocol(),
}
}
fn send_request<RB: MessageBody + 'static, H: Into<RequestHeadType>>(
2019-01-28 20:41:09 -08:00
self,
head: H,
2019-01-28 20:41:09 -08:00
body: RB,
) -> Self::Future {
match self {
EitherConnection::A(con) => con.send_request(head, body),
EitherConnection::B(con) => con.send_request(head, body),
}
2018-11-11 23:12:54 -08:00
}
2019-03-27 18:53:19 -07:00
2019-11-18 18:42:27 +06:00
type TunnelFuture = LocalBoxFuture<
'static,
Result<(ResponseHead, Framed<Self::Io, ClientCodec>), SendRequestError>,
2019-03-27 18:53:19 -07:00
>;
/// Send request, returns Response and Framed
fn open_tunnel<H: Into<RequestHeadType>>(self, head: H) -> Self::TunnelFuture {
2019-03-27 18:53:19 -07:00
match self {
2019-11-18 18:42:27 +06:00
EitherConnection::A(con) => con
.open_tunnel(head)
.map(|res| res.map(|(head, framed)| (head, framed.map_io(EitherIo::A))))
.boxed_local(),
EitherConnection::B(con) => con
.open_tunnel(head)
.map(|res| res.map(|(head, framed)| (head, framed.map_io(EitherIo::B))))
.boxed_local(),
2019-03-27 18:53:19 -07:00
}
}
}
2019-11-19 18:54:19 +06:00
#[pin_project]
2019-03-27 18:53:19 -07:00
pub enum EitherIo<A, B> {
2019-11-19 18:54:19 +06:00
A(#[pin] A),
B(#[pin] B),
2019-03-27 18:53:19 -07:00
}
2019-11-18 18:42:27 +06:00
impl<A, B> AsyncRead for EitherIo<A, B>
2019-03-27 18:53:19 -07:00
where
2019-11-19 18:54:19 +06:00
A: AsyncRead,
B: AsyncRead,
2019-03-27 18:53:19 -07:00
{
2019-11-19 18:54:19 +06:00
#[project]
2019-11-18 18:42:27 +06:00
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
2019-11-19 18:54:19 +06:00
#[project]
match self.project() {
EitherIo::A(val) => val.poll_read(cx, buf),
EitherIo::B(val) => val.poll_read(cx, buf),
2019-03-27 18:53:19 -07:00
}
}
2019-12-05 23:35:43 +06:00
unsafe fn prepare_uninitialized_buffer(
&self,
buf: &mut [mem::MaybeUninit<u8>],
) -> bool {
2019-03-27 18:53:19 -07:00
match self {
EitherIo::A(ref val) => val.prepare_uninitialized_buffer(buf),
EitherIo::B(ref val) => val.prepare_uninitialized_buffer(buf),
}
}
}
2019-11-18 18:42:27 +06:00
impl<A, B> AsyncWrite for EitherIo<A, B>
2019-03-27 18:53:19 -07:00
where
2019-11-19 18:54:19 +06:00
A: AsyncWrite,
B: AsyncWrite,
2019-03-27 18:53:19 -07:00
{
2019-11-19 18:54:19 +06:00
#[project]
2019-11-18 18:42:27 +06:00
fn poll_write(
self: Pin<&mut Self>,
2019-12-08 00:46:51 +06:00
cx: &mut Context<'_>,
2019-11-18 18:42:27 +06:00
buf: &[u8],
) -> Poll<io::Result<usize>> {
2019-11-19 18:54:19 +06:00
#[project]
match self.project() {
EitherIo::A(val) => val.poll_write(cx, buf),
EitherIo::B(val) => val.poll_write(cx, buf),
2019-03-27 18:53:19 -07:00
}
}
2019-11-19 18:54:19 +06:00
#[project]
2019-11-18 18:42:27 +06:00
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
2019-11-19 18:54:19 +06:00
#[project]
match self.project() {
EitherIo::A(val) => val.poll_flush(cx),
EitherIo::B(val) => val.poll_flush(cx),
2019-03-27 18:53:19 -07:00
}
}
2019-11-19 18:54:19 +06:00
#[project]
2019-11-18 18:42:27 +06:00
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
2019-11-19 18:54:19 +06:00
#[project]
match self.project() {
EitherIo::A(val) => val.poll_shutdown(cx),
EitherIo::B(val) => val.poll_shutdown(cx),
2019-03-27 18:53:19 -07:00
}
}
2019-11-19 18:54:19 +06:00
#[project]
2019-11-18 18:42:27 +06:00
fn poll_write_buf<U: Buf>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut U,
) -> Poll<Result<usize, io::Error>>
2019-03-27 18:53:19 -07:00
where
Self: Sized,
{
2019-11-19 18:54:19 +06:00
#[project]
match self.project() {
EitherIo::A(val) => val.poll_write_buf(cx, buf),
EitherIo::B(val) => val.poll_write_buf(cx, buf),
2019-03-27 18:53:19 -07:00
}
}
2018-11-11 23:12:54 -08:00
}