1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-23 15:24:36 +01:00
actix-extras/awc/src/connect.rs

132 lines
3.5 KiB
Rust
Raw Normal View History

2019-03-27 18:53:19 -07:00
use std::io;
use actix_codec::{AsyncRead, AsyncWrite, Framed};
2019-03-25 21:58:01 -07:00
use actix_http::body::Body;
use actix_http::client::{ConnectError, Connection, SendRequestError};
2019-03-27 18:53:19 -07:00
use actix_http::h1::ClientCodec;
use actix_http::{http, RequestHead, ResponseHead};
2019-03-25 21:58:01 -07:00
use actix_service::Service;
2019-03-27 18:53:19 -07:00
use futures::{Future, Poll};
2019-03-25 21:58:01 -07:00
use crate::response::ClientResponse;
2019-03-25 21:58:01 -07:00
pub(crate) struct ConnectorWrapper<T>(pub T);
pub(crate) trait Connect {
fn send_request(
&mut self,
head: RequestHead,
body: Body,
) -> Box<Future<Item = ClientResponse, Error = SendRequestError>>;
2019-03-27 18:53:19 -07:00
/// Send request, returns Response and Framed
fn open_tunnel(
&mut self,
head: RequestHead,
) -> Box<
Future<
Item = (ResponseHead, Framed<BoxedSocket, ClientCodec>),
Error = SendRequestError,
>,
>;
2019-03-25 21:58:01 -07:00
}
impl<T> Connect for ConnectorWrapper<T>
where
T: Service<Request = http::Uri, Error = ConnectError>,
T::Response: Connection,
2019-03-27 18:53:19 -07:00
<T::Response as Connection>::Io: 'static,
2019-03-25 21:58:01 -07:00
<T::Response as Connection>::Future: 'static,
2019-03-27 18:53:19 -07:00
<T::Response as Connection>::TunnelFuture: 'static,
2019-03-25 21:58:01 -07:00
T::Future: 'static,
{
fn send_request(
&mut self,
head: RequestHead,
body: Body,
) -> Box<Future<Item = ClientResponse, Error = SendRequestError>> {
Box::new(
self.0
// connect to the host
.call(head.uri.clone())
.from_err()
// send request
.and_then(move |connection| connection.send_request(head, body))
.map(|(head, payload)| ClientResponse::new(head, payload)),
2019-03-25 21:58:01 -07:00
)
}
2019-03-27 18:53:19 -07:00
fn open_tunnel(
&mut self,
head: RequestHead,
) -> Box<
Future<
Item = (ResponseHead, Framed<BoxedSocket, ClientCodec>),
Error = SendRequestError,
>,
> {
Box::new(
self.0
// connect to the host
.call(head.uri.clone())
.from_err()
// send request
.and_then(move |connection| connection.open_tunnel(head))
.map(|(head, framed)| {
let framed = framed.map_io(|io| BoxedSocket(Box::new(Socket(io))));
(head, framed)
}),
)
}
}
trait AsyncSocket {
fn as_read(&self) -> &AsyncRead;
fn as_read_mut(&mut self) -> &mut AsyncRead;
fn as_write(&mut self) -> &mut AsyncWrite;
}
struct Socket<T: AsyncRead + AsyncWrite>(T);
impl<T: AsyncRead + AsyncWrite> AsyncSocket for Socket<T> {
fn as_read(&self) -> &AsyncRead {
&self.0
}
fn as_read_mut(&mut self) -> &mut AsyncRead {
&mut self.0
}
fn as_write(&mut self) -> &mut AsyncWrite {
&mut self.0
}
}
pub struct BoxedSocket(Box<dyn AsyncSocket>);
impl io::Read for BoxedSocket {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.0.as_read_mut().read(buf)
}
}
impl AsyncRead for BoxedSocket {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.0.as_read().prepare_uninitialized_buffer(buf)
}
}
impl io::Write for BoxedSocket {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.as_write().write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.0.as_write().flush()
}
}
impl AsyncWrite for BoxedSocket {
fn shutdown(&mut self) -> Poll<(), io::Error> {
self.0.as_write().shutdown()
}
2019-03-25 21:58:01 -07:00
}