1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 16:02:59 +01:00

define generic client Connection trait

This commit is contained in:
Nikolay Kim 2018-11-15 11:10:23 -08:00
parent acd42f92d8
commit 6d9733cdf7
5 changed files with 129 additions and 81 deletions

View File

@ -5,14 +5,23 @@ use tokio_io::{AsyncRead, AsyncWrite};
use super::pool::Acquired;
pub trait Connection: AsyncRead + AsyncWrite + 'static {
/// Close connection
fn close(&mut self);
/// Release connection to the connection pool
fn release(&mut self);
}
#[doc(hidden)]
/// HTTP client connection
pub struct Connection<T> {
io: T,
pub struct IoConnection<T> {
io: Option<T>,
created: time::Instant,
pool: Option<Acquired<T>>,
}
impl<T> fmt::Debug for Connection<T>
impl<T> fmt::Debug for IoConnection<T>
where
T: fmt::Debug,
{
@ -21,59 +30,73 @@ where
}
}
impl<T: AsyncRead + AsyncWrite + 'static> Connection<T> {
impl<T: AsyncRead + AsyncWrite + 'static> IoConnection<T> {
pub(crate) fn new(io: T, created: time::Instant, pool: Acquired<T>) -> Self {
Connection {
io,
IoConnection {
created,
io: Some(io),
pool: Some(pool),
}
}
/// Raw IO stream
pub fn get_mut(&mut self) -> &mut T {
&mut self.io
self.io.as_mut().unwrap()
}
pub(crate) fn into_inner(self) -> (T, time::Instant) {
(self.io.unwrap(), self.created)
}
}
impl<T: AsyncRead + AsyncWrite + 'static> Connection for IoConnection<T> {
/// Close connection
pub fn close(mut self) {
fn close(&mut self) {
if let Some(mut pool) = self.pool.take() {
pool.close(self)
if let Some(io) = self.io.take() {
pool.close(IoConnection {
io: Some(io),
created: self.created,
pool: None,
})
}
}
}
/// Release this connection to the connection pool
pub fn release(mut self) {
fn release(&mut self) {
if let Some(mut pool) = self.pool.take() {
pool.release(self)
if let Some(io) = self.io.take() {
pool.release(IoConnection {
io: Some(io),
created: self.created,
pool: None,
})
}
}
}
pub(crate) fn into_inner(self) -> (T, time::Instant) {
(self.io, self.created)
}
}
impl<T: AsyncRead + AsyncWrite + 'static> io::Read for Connection<T> {
impl<T: AsyncRead + AsyncWrite + 'static> io::Read for IoConnection<T> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.io.read(buf)
self.io.as_mut().unwrap().read(buf)
}
}
impl<T: AsyncRead + AsyncWrite + 'static> AsyncRead for Connection<T> {}
impl<T: AsyncRead + AsyncWrite + 'static> AsyncRead for IoConnection<T> {}
impl<T: AsyncRead + AsyncWrite + 'static> io::Write for Connection<T> {
impl<T: AsyncRead + AsyncWrite + 'static> io::Write for IoConnection<T> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.io.write(buf)
self.io.as_mut().unwrap().write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.io.flush()
self.io.as_mut().unwrap().flush()
}
}
impl<T: AsyncRead + AsyncWrite + 'static> AsyncWrite for Connection<T> {
impl<T: AsyncRead + AsyncWrite + 'static> AsyncWrite for IoConnection<T> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
self.io.shutdown()
self.io.as_mut().unwrap().shutdown()
}
}

View File

@ -11,7 +11,7 @@ use tokio_io::{AsyncRead, AsyncWrite};
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
use super::connect::Connect;
use super::connection::Connection;
use super::connection::{Connection, IoConnection};
use super::error::ConnectorError;
use super::pool::ConnectionPool;
@ -130,7 +130,7 @@ impl Connector {
self,
) -> impl Service<
Request = Connect,
Response = Connection<impl AsyncRead + AsyncWrite + fmt::Debug>,
Response = impl Connection,
Error = ConnectorError,
> + Clone {
#[cfg(not(feature = "ssl"))]
@ -234,11 +234,11 @@ mod connect_impl {
T: Service<Request = Connect, Response = (Connect, Io), Error = ConnectorError>,
{
type Request = Connect;
type Response = Connection<Io>;
type Response = IoConnection<Io>;
type Error = ConnectorError;
type Future = Either<
<ConnectionPool<T, Io> as Service>::Future,
FutureResult<Connection<Io>, ConnectorError>,
FutureResult<IoConnection<Io>, ConnectorError>,
>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
@ -324,7 +324,7 @@ mod connect_impl {
>,
{
type Request = Connect;
type Response = IoEither<Connection<Io1>, Connection<Io2>>;
type Response = IoEither<IoConnection<Io1>, IoConnection<Io2>>;
type Error = ConnectorError;
type Future = Either<
FutureResult<Self::Response, Self::Error>,
@ -342,13 +342,13 @@ mod connect_impl {
if let Err(e) = req.validate() {
Either::A(err(e))
} else if req.is_secure() {
Either::B(Either::A(InnerConnectorResponseA {
fut: self.tcp_pool.call(req),
Either::B(Either::B(InnerConnectorResponseB {
fut: self.ssl_pool.call(req),
_t: PhantomData,
}))
} else {
Either::B(Either::B(InnerConnectorResponseB {
fut: self.ssl_pool.call(req),
Either::B(Either::A(InnerConnectorResponseA {
fut: self.tcp_pool.call(req),
_t: PhantomData,
}))
}
@ -370,7 +370,7 @@ mod connect_impl {
Io1: AsyncRead + AsyncWrite + 'static,
Io2: AsyncRead + AsyncWrite + 'static,
{
type Item = IoEither<Connection<Io1>, Connection<Io2>>;
type Item = IoEither<IoConnection<Io1>, IoConnection<Io2>>;
type Error = ConnectorError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -396,7 +396,7 @@ mod connect_impl {
Io1: AsyncRead + AsyncWrite + 'static,
Io2: AsyncRead + AsyncWrite + 'static,
{
type Item = IoEither<Connection<Io1>, Connection<Io2>>;
type Item = IoEither<IoConnection<Io1>, IoConnection<Io2>>;
type Error = ConnectorError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -413,10 +413,30 @@ pub(crate) enum IoEither<Io1, Io2> {
B(Io2),
}
impl<Io1, Io2> Connection for IoEither<Io1, Io2>
where
Io1: Connection,
Io2: Connection,
{
fn close(&mut self) {
match self {
IoEither::A(ref mut io) => io.close(),
IoEither::B(ref mut io) => io.close(),
}
}
fn release(&mut self) {
match self {
IoEither::A(ref mut io) => io.release(),
IoEither::B(ref mut io) => io.release(),
}
}
}
impl<Io1, Io2> io::Read for IoEither<Io1, Io2>
where
Io1: io::Read,
Io2: io::Read,
Io1: Connection,
Io2: Connection,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self {
@ -428,8 +448,8 @@ where
impl<Io1, Io2> AsyncRead for IoEither<Io1, Io2>
where
Io1: AsyncRead,
Io2: AsyncRead,
Io1: Connection,
Io2: Connection,
{
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
match self {
@ -441,8 +461,8 @@ where
impl<Io1, Io2> AsyncWrite for IoEither<Io1, Io2>
where
Io1: AsyncWrite,
Io2: AsyncWrite,
Io1: Connection,
Io2: Connection,
{
fn shutdown(&mut self) -> Poll<(), io::Error> {
match self {
@ -468,8 +488,8 @@ where
impl<Io1, Io2> io::Write for IoEither<Io1, Io2>
where
Io1: io::Write,
Io2: io::Write,
Io1: Connection,
Io2: Connection,
{
fn flush(&mut self) -> io::Result<()> {
match self {

View File

@ -15,27 +15,32 @@ use body::{BodyType, MessageBody, PayloadStream};
use error::PayloadError;
use h1;
pub fn send_request<T, Io, B>(
pub(crate) fn send_request<T, I, B>(
head: RequestHead,
body: B,
connector: &mut T,
) -> impl Future<Item = ClientResponse, Error = SendRequestError>
where
T: Service<Request = Connect, Response = Connection<Io>, Error = ConnectorError>,
T: Service<Request = Connect, Response = I, Error = ConnectorError>,
B: MessageBody,
Io: AsyncRead + AsyncWrite + 'static,
I: Connection,
{
let tp = body.tp();
connector
// connect to the host
.call(Connect::new(head.uri.clone()))
.from_err()
// create Framed and send reqest
.map(|io| Framed::new(io, h1::ClientCodec::default()))
.and_then(|framed| framed.send((head, tp).into()).from_err())
// send request body
.and_then(move |framed| match body.tp() {
BodyType::None | BodyType::Zero => Either::A(ok(framed)),
_ => Either::B(SendBody::new(body, framed)),
}).and_then(|framed| {
})
// read response and init read body
.and_then(|framed| {
framed
.into_future()
.map_err(|(e, _)| SendRequestError::from(e))
@ -55,19 +60,20 @@ where
})
}
struct SendBody<Io, B> {
/// Future responsible for sending request body to the peer
struct SendBody<I, B> {
body: Option<B>,
framed: Option<Framed<Connection<Io>, h1::ClientCodec>>,
framed: Option<Framed<I, h1::ClientCodec>>,
write_buf: VecDeque<h1::Message<(RequestHead, BodyType)>>,
flushed: bool,
}
impl<Io, B> SendBody<Io, B>
impl<I, B> SendBody<I, B>
where
Io: AsyncRead + AsyncWrite + 'static,
I: AsyncRead + AsyncWrite + 'static,
B: MessageBody,
{
fn new(body: B, framed: Framed<Connection<Io>, h1::ClientCodec>) -> Self {
fn new(body: B, framed: Framed<I, h1::ClientCodec>) -> Self {
SendBody {
body: Some(body),
framed: Some(framed),
@ -77,12 +83,12 @@ where
}
}
impl<Io, B> Future for SendBody<Io, B>
impl<I, B> Future for SendBody<I, B>
where
Io: AsyncRead + AsyncWrite + 'static,
I: Connection,
B: MessageBody,
{
type Item = Framed<Connection<Io>, h1::ClientCodec>;
type Item = Framed<I, h1::ClientCodec>;
type Error = SendRequestError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -135,7 +141,7 @@ impl Stream for EmptyPayload {
}
pub(crate) struct Payload<Io> {
framed: Option<Framed<Connection<Io>, h1::ClientPayloadCodec>>,
framed: Option<Framed<Io, h1::ClientPayloadCodec>>,
}
impl Payload<()> {
@ -144,15 +150,15 @@ impl Payload<()> {
}
}
impl<Io: AsyncRead + AsyncWrite + 'static> Payload<Io> {
fn stream(framed: Framed<Connection<Io>, h1::ClientCodec>) -> PayloadStream {
impl<Io: Connection> Payload<Io> {
fn stream(framed: Framed<Io, h1::ClientCodec>) -> PayloadStream {
Box::new(Payload {
framed: Some(framed.map_codec(|codec| codec.into_payload_codec())),
})
}
}
impl<Io: AsyncRead + AsyncWrite + 'static> Stream for Payload<Io> {
impl<Io: Connection> Stream for Payload<Io> {
type Item = Bytes;
type Error = PayloadError;
@ -170,11 +176,11 @@ impl<Io: AsyncRead + AsyncWrite + 'static> Stream for Payload<Io> {
}
}
fn release_connection<T, U>(framed: Framed<Connection<T>, U>)
fn release_connection<T, U>(framed: Framed<T, U>)
where
T: AsyncRead + AsyncWrite + 'static,
T: Connection,
{
let parts = framed.into_parts();
let mut parts = framed.into_parts();
if parts.read_buf.is_empty() && parts.write_buf.is_empty() {
parts.io.release()
} else {

View File

@ -17,7 +17,7 @@ use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::{sleep, Delay};
use super::connect::Connect;
use super::connection::Connection;
use super::connection::IoConnection;
use super::error::ConnectorError;
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
@ -89,10 +89,10 @@ where
T: Service<Request = Connect, Response = (Connect, Io), Error = ConnectorError>,
{
type Request = Connect;
type Response = Connection<Io>;
type Response = IoConnection<Io>;
type Error = ConnectorError;
type Future = Either<
FutureResult<Connection<Io>, ConnectorError>,
FutureResult<IoConnection<Io>, ConnectorError>,
Either<WaitForConnection<Io>, OpenConnection<T::Future, Io>>,
>;
@ -107,7 +107,7 @@ where
match self.1.as_ref().borrow_mut().acquire(&key) {
Acquire::Acquired(io, created) => {
// use existing connection
Either::A(ok(Connection::new(
Either::A(ok(IoConnection::new(
io,
created,
Acquired(key, Some(self.1.clone())),
@ -142,7 +142,7 @@ where
{
key: Key,
token: usize,
rx: oneshot::Receiver<Result<Connection<Io>, ConnectorError>>,
rx: oneshot::Receiver<Result<IoConnection<Io>, ConnectorError>>,
inner: Option<Rc<RefCell<Inner<Io>>>>,
}
@ -163,7 +163,7 @@ impl<Io> Future for WaitForConnection<Io>
where
Io: AsyncRead + AsyncWrite,
{
type Item = Connection<Io>;
type Item = IoConnection<Io>;
type Error = ConnectorError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -226,7 +226,7 @@ where
F: Future<Item = (Connect, Io), Error = ConnectorError>,
Io: AsyncRead + AsyncWrite,
{
type Item = Connection<Io>;
type Item = IoConnection<Io>;
type Error = ConnectorError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -234,7 +234,7 @@ where
Err(err) => Err(err.into()),
Ok(Async::Ready((_, io))) => {
let _ = self.inner.take();
Ok(Async::Ready(Connection::new(
Ok(Async::Ready(IoConnection::new(
io,
Instant::now(),
Acquired(self.key.clone(), self.inner.clone()),
@ -251,7 +251,7 @@ where
{
fut: F,
key: Key,
rx: Option<oneshot::Sender<Result<Connection<Io>, ConnectorError>>>,
rx: Option<oneshot::Sender<Result<IoConnection<Io>, ConnectorError>>>,
inner: Option<Rc<RefCell<Inner<Io>>>>,
}
@ -262,7 +262,7 @@ where
{
fn spawn(
key: Key,
rx: oneshot::Sender<Result<Connection<Io>, ConnectorError>>,
rx: oneshot::Sender<Result<IoConnection<Io>, ConnectorError>>,
inner: Rc<RefCell<Inner<Io>>>,
fut: F,
) {
@ -308,7 +308,7 @@ where
Ok(Async::Ready((_, io))) => {
let _ = self.inner.take();
if let Some(rx) = self.rx.take() {
let _ = rx.send(Ok(Connection::new(
let _ = rx.send(Ok(IoConnection::new(
io,
Instant::now(),
Acquired(self.key.clone(), self.inner.clone()),
@ -336,7 +336,7 @@ pub(crate) struct Inner<Io> {
available: HashMap<Key, VecDeque<AvailableConnection<Io>>>,
waiters: Slab<(
Connect,
oneshot::Sender<Result<Connection<Io>, ConnectorError>>,
oneshot::Sender<Result<IoConnection<Io>, ConnectorError>>,
)>,
waiters_queue: IndexSet<(Key, usize)>,
task: AtomicTask,
@ -378,7 +378,7 @@ where
&mut self,
connect: Connect,
) -> (
oneshot::Receiver<Result<Connection<Io>, ConnectorError>>,
oneshot::Receiver<Result<IoConnection<Io>, ConnectorError>>,
usize,
) {
let (tx, rx) = oneshot::channel();
@ -479,7 +479,7 @@ where
Acquire::NotAvailable => break,
Acquire::Acquired(io, created) => {
let (_, tx) = inner.waiters.remove(token);
if let Err(conn) = tx.send(Ok(Connection::new(
if let Err(conn) = tx.send(Ok(IoConnection::new(
io,
created,
Acquired(key.clone(), Some(self.inner.clone())),
@ -546,13 +546,13 @@ impl<T> Acquired<T>
where
T: AsyncRead + AsyncWrite + 'static,
{
pub(crate) fn close(&mut self, conn: Connection<T>) {
pub(crate) fn close(&mut self, conn: IoConnection<T>) {
if let Some(inner) = self.1.take() {
let (io, _) = conn.into_inner();
inner.as_ref().borrow_mut().release_close(io);
}
}
pub(crate) fn release(&mut self, conn: Connection<T>) {
pub(crate) fn release(&mut self, conn: IoConnection<T>) {
if let Some(inner) = self.1.take() {
let (io, created) = conn.into_inner();
inner

View File

@ -7,7 +7,6 @@ use bytes::{BufMut, Bytes, BytesMut};
use cookie::{Cookie, CookieJar};
use futures::{Future, Stream};
use percent_encoding::{percent_encode, USERINFO_ENCODE_SET};
use tokio_io::{AsyncRead, AsyncWrite};
use urlcrate::Url;
use body::{MessageBody, MessageBodyStream};
@ -176,13 +175,13 @@ where
// Send request
///
/// This method returns a future that resolves to a ClientResponse
pub fn send<T, Io>(
pub fn send<T, I>(
self,
connector: &mut T,
) -> impl Future<Item = ClientResponse, Error = SendRequestError>
where
T: Service<Request = Connect, Response = Connection<Io>, Error = ConnectorError>,
Io: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Connect, Response = I, Error = ConnectorError>,
I: Connection,
{
pipeline::send_request(self.head, self.body, connector)
}