mirror of
https://github.com/fafhrd91/actix-web
synced 2024-11-25 17:02:44 +01:00
249 lines
7.3 KiB
Rust
249 lines
7.3 KiB
Rust
use std::{io, time};
|
|
|
|
use actix_codec::{AsyncRead, AsyncWrite, Framed};
|
|
use bytes::Bytes;
|
|
use futures::future::{err, ok, Either};
|
|
use futures::{Async, Future, Poll, Sink, Stream};
|
|
|
|
use super::connection::{ConnectionLifetime, ConnectionType, IoConnection};
|
|
use super::error::{ConnectError, SendRequestError};
|
|
use super::pool::Acquired;
|
|
use super::response::ClientResponse;
|
|
use crate::body::{BodyLength, MessageBody};
|
|
use crate::error::PayloadError;
|
|
use crate::h1;
|
|
use crate::message::RequestHead;
|
|
|
|
pub(crate) fn send_request<T, B>(
|
|
io: T,
|
|
head: RequestHead,
|
|
body: B,
|
|
created: time::Instant,
|
|
pool: Option<Acquired<T>>,
|
|
) -> impl Future<Item = ClientResponse, Error = SendRequestError>
|
|
where
|
|
T: AsyncRead + AsyncWrite + 'static,
|
|
B: MessageBody,
|
|
{
|
|
let io = H1Connection {
|
|
created,
|
|
pool,
|
|
io: Some(io),
|
|
};
|
|
|
|
let len = body.length();
|
|
|
|
// create Framed and send reqest
|
|
Framed::new(io, h1::ClientCodec::default())
|
|
.send((head, len).into())
|
|
.from_err()
|
|
// send request body
|
|
.and_then(move |framed| match body.length() {
|
|
BodyLength::None | BodyLength::Empty | BodyLength::Sized(0) => {
|
|
Either::A(ok(framed))
|
|
}
|
|
_ => Either::B(SendBody::new(body, framed)),
|
|
})
|
|
// read response and init read body
|
|
.and_then(|framed| {
|
|
framed
|
|
.into_future()
|
|
.map_err(|(e, _)| SendRequestError::from(e))
|
|
.and_then(|(item, framed)| {
|
|
if let Some(mut res) = item {
|
|
match framed.get_codec().message_type() {
|
|
h1::MessageType::None => {
|
|
let force_close = !framed.get_codec().keepalive();
|
|
release_connection(framed, force_close)
|
|
}
|
|
_ => {
|
|
res.set_payload(Payload::stream(framed).into());
|
|
}
|
|
}
|
|
ok(res)
|
|
} else {
|
|
err(ConnectError::Disconnected.into())
|
|
}
|
|
})
|
|
})
|
|
}
|
|
|
|
#[doc(hidden)]
|
|
/// HTTP client connection
|
|
pub struct H1Connection<T> {
|
|
io: Option<T>,
|
|
created: time::Instant,
|
|
pool: Option<Acquired<T>>,
|
|
}
|
|
|
|
impl<T: AsyncRead + AsyncWrite + 'static> ConnectionLifetime for H1Connection<T> {
|
|
/// Close connection
|
|
fn close(&mut self) {
|
|
if let Some(mut pool) = self.pool.take() {
|
|
if let Some(io) = self.io.take() {
|
|
pool.close(IoConnection::new(
|
|
ConnectionType::H1(io),
|
|
self.created,
|
|
None,
|
|
));
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Release this connection to the connection pool
|
|
fn release(&mut self) {
|
|
if let Some(mut pool) = self.pool.take() {
|
|
if let Some(io) = self.io.take() {
|
|
pool.release(IoConnection::new(
|
|
ConnectionType::H1(io),
|
|
self.created,
|
|
None,
|
|
));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T: AsyncRead + AsyncWrite + 'static> io::Read for H1Connection<T> {
|
|
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
|
self.io.as_mut().unwrap().read(buf)
|
|
}
|
|
}
|
|
|
|
impl<T: AsyncRead + AsyncWrite + 'static> AsyncRead for H1Connection<T> {}
|
|
|
|
impl<T: AsyncRead + AsyncWrite + 'static> io::Write for H1Connection<T> {
|
|
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
|
self.io.as_mut().unwrap().write(buf)
|
|
}
|
|
|
|
fn flush(&mut self) -> io::Result<()> {
|
|
self.io.as_mut().unwrap().flush()
|
|
}
|
|
}
|
|
|
|
impl<T: AsyncRead + AsyncWrite + 'static> AsyncWrite for H1Connection<T> {
|
|
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
|
self.io.as_mut().unwrap().shutdown()
|
|
}
|
|
}
|
|
|
|
/// Future responsible for sending request body to the peer
|
|
pub(crate) struct SendBody<I, B> {
|
|
body: Option<B>,
|
|
framed: Option<Framed<I, h1::ClientCodec>>,
|
|
flushed: bool,
|
|
}
|
|
|
|
impl<I, B> SendBody<I, B>
|
|
where
|
|
I: AsyncRead + AsyncWrite + 'static,
|
|
B: MessageBody,
|
|
{
|
|
pub(crate) fn new(body: B, framed: Framed<I, h1::ClientCodec>) -> Self {
|
|
SendBody {
|
|
body: Some(body),
|
|
framed: Some(framed),
|
|
flushed: true,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<I, B> Future for SendBody<I, B>
|
|
where
|
|
I: ConnectionLifetime,
|
|
B: MessageBody,
|
|
{
|
|
type Item = Framed<I, h1::ClientCodec>;
|
|
type Error = SendRequestError;
|
|
|
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
|
let mut body_ready = true;
|
|
loop {
|
|
while body_ready
|
|
&& self.body.is_some()
|
|
&& !self.framed.as_ref().unwrap().is_write_buf_full()
|
|
{
|
|
match self.body.as_mut().unwrap().poll_next()? {
|
|
Async::Ready(item) => {
|
|
// check if body is done
|
|
if item.is_none() {
|
|
let _ = self.body.take();
|
|
}
|
|
self.flushed = false;
|
|
self.framed
|
|
.as_mut()
|
|
.unwrap()
|
|
.force_send(h1::Message::Chunk(item))?;
|
|
break;
|
|
}
|
|
Async::NotReady => body_ready = false,
|
|
}
|
|
}
|
|
|
|
if !self.flushed {
|
|
match self.framed.as_mut().unwrap().poll_complete()? {
|
|
Async::Ready(_) => {
|
|
self.flushed = true;
|
|
continue;
|
|
}
|
|
Async::NotReady => return Ok(Async::NotReady),
|
|
}
|
|
}
|
|
|
|
if self.body.is_none() {
|
|
return Ok(Async::Ready(self.framed.take().unwrap()));
|
|
}
|
|
return Ok(Async::NotReady);
|
|
}
|
|
}
|
|
}
|
|
|
|
pub(crate) struct Payload<Io> {
|
|
framed: Option<Framed<Io, h1::ClientPayloadCodec>>,
|
|
}
|
|
|
|
impl<Io: ConnectionLifetime> Payload<Io> {
|
|
pub fn stream(
|
|
framed: Framed<Io, h1::ClientCodec>,
|
|
) -> Box<Stream<Item = Bytes, Error = PayloadError>> {
|
|
Box::new(Payload {
|
|
framed: Some(framed.map_codec(|codec| codec.into_payload_codec())),
|
|
})
|
|
}
|
|
}
|
|
|
|
impl<Io: ConnectionLifetime> Stream for Payload<Io> {
|
|
type Item = Bytes;
|
|
type Error = PayloadError;
|
|
|
|
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
|
match self.framed.as_mut().unwrap().poll()? {
|
|
Async::NotReady => Ok(Async::NotReady),
|
|
Async::Ready(Some(chunk)) => {
|
|
if let Some(chunk) = chunk {
|
|
Ok(Async::Ready(Some(chunk)))
|
|
} else {
|
|
let framed = self.framed.take().unwrap();
|
|
let force_close = framed.get_codec().keepalive();
|
|
release_connection(framed, force_close);
|
|
Ok(Async::Ready(None))
|
|
}
|
|
}
|
|
Async::Ready(None) => Ok(Async::Ready(None)),
|
|
}
|
|
}
|
|
}
|
|
|
|
fn release_connection<T, U>(framed: Framed<T, U>, force_close: bool)
|
|
where
|
|
T: ConnectionLifetime,
|
|
{
|
|
let mut parts = framed.into_parts();
|
|
if !force_close && parts.read_buf.is_empty() && parts.write_buf.is_empty() {
|
|
parts.io.release()
|
|
} else {
|
|
parts.io.close()
|
|
}
|
|
}
|