1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-11-25 08:52:42 +01:00
actix-web/src/client/h2proto.rs

188 lines
6.2 KiB
Rust
Raw Normal View History

use std::cell::RefCell;
2019-01-29 05:41:09 +01:00
use std::time;
use actix_codec::{AsyncRead, AsyncWrite};
use bytes::Bytes;
use futures::future::{err, Either};
2019-02-06 20:44:15 +01:00
use futures::{Async, Future, Poll};
2019-01-29 05:41:09 +01:00
use h2::{client::SendRequest, SendStream};
2019-02-06 20:44:15 +01:00
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING};
use http::{request::Request, HttpTryFrom, Version};
use crate::body::{BodyLength, MessageBody};
use crate::message::{RequestHead, ResponseHead};
2019-01-29 05:41:09 +01:00
use super::connection::{ConnectionType, IoConnection};
use super::error::SendRequestError;
use super::pool::Acquired;
use super::response::ClientResponse;
pub(crate) fn send_request<T, B>(
io: SendRequest<Bytes>,
head: RequestHead,
body: B,
created: time::Instant,
pool: Option<Acquired<T>>,
) -> impl Future<Item = ClientResponse, Error = SendRequestError>
where
T: AsyncRead + AsyncWrite + 'static,
B: MessageBody,
{
trace!("Sending client request: {:?} {:?}", head, body.length());
2019-02-06 20:44:15 +01:00
let length = body.length();
let eof = match length {
2019-01-29 05:41:09 +01:00
BodyLength::None | BodyLength::Empty | BodyLength::Sized(0) => true,
_ => false,
};
io.ready()
.map_err(SendRequestError::from)
.and_then(move |mut io| {
let mut req = Request::new(());
*req.uri_mut() = head.uri;
*req.method_mut() = head.method;
*req.version_mut() = Version::HTTP_2;
2019-02-06 20:44:15 +01:00
let mut skip_len = true;
let mut has_date = false;
// Content length
let _ = match length {
BodyLength::Chunked | BodyLength::None => None,
BodyLength::Stream => {
skip_len = false;
None
}
BodyLength::Empty => req
.headers_mut()
.insert(CONTENT_LENGTH, HeaderValue::from_static("0")),
BodyLength::Sized(len) => req.headers_mut().insert(
CONTENT_LENGTH,
HeaderValue::try_from(format!("{}", len)).unwrap(),
),
BodyLength::Sized64(len) => req.headers_mut().insert(
CONTENT_LENGTH,
HeaderValue::try_from(format!("{}", len)).unwrap(),
),
};
// copy headers
for (key, value) in head.headers.iter() {
match *key {
CONNECTION | TRANSFER_ENCODING => continue, // http2 specific
CONTENT_LENGTH if skip_len => continue,
DATE => has_date = true,
_ => (),
}
req.headers_mut().append(key, value.clone());
}
2019-01-29 05:41:09 +01:00
match io.send_request(req, eof) {
2019-02-06 20:44:15 +01:00
Ok((res, send)) => {
2019-01-29 05:41:09 +01:00
release(io, pool, created, false);
if !eof {
Either::A(Either::B(
SendBody {
body,
send,
buf: None,
}
2019-02-06 20:44:15 +01:00
.and_then(move |_| res.map_err(SendRequestError::from)),
2019-01-29 05:41:09 +01:00
))
} else {
2019-02-06 20:44:15 +01:00
Either::B(res.map_err(SendRequestError::from))
2019-01-29 05:41:09 +01:00
}
}
Err(e) => {
release(io, pool, created, e.is_io());
Either::A(Either::A(err(e.into())))
}
}
})
.and_then(|resp| {
let (parts, body) = resp.into_parts();
let mut head = ResponseHead::default();
head.version = parts.version;
head.status = parts.status;
head.headers = parts.headers;
Ok(ClientResponse {
head,
2019-02-12 20:07:42 +01:00
payload: RefCell::new(body.into()),
2019-01-29 05:41:09 +01:00
})
})
.from_err()
}
struct SendBody<B: MessageBody> {
body: B,
send: SendStream<Bytes>,
buf: Option<Bytes>,
}
impl<B: MessageBody> Future for SendBody<B> {
type Item = ();
type Error = SendRequestError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
2019-01-29 19:14:00 +01:00
loop {
if self.buf.is_none() {
match self.body.poll_next() {
Ok(Async::Ready(Some(buf))) => {
self.send.reserve_capacity(buf.len());
self.buf = Some(buf);
}
Ok(Async::Ready(None)) => {
if let Err(e) = self.send.send_data(Bytes::new(), true) {
return Err(e.into());
}
self.send.reserve_capacity(0);
return Ok(Async::Ready(()));
2019-01-29 05:41:09 +01:00
}
2019-01-29 19:14:00 +01:00
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(e) => return Err(e.into()),
2019-01-29 05:41:09 +01:00
}
}
match self.send.poll_capacity() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
Ok(Async::Ready(Some(cap))) => {
let mut buf = self.buf.take().unwrap();
let len = buf.len();
let bytes = buf.split_to(std::cmp::min(cap, len));
if let Err(e) = self.send.send_data(bytes, false) {
return Err(e.into());
} else {
if !buf.is_empty() {
self.send.reserve_capacity(buf.len());
self.buf = Some(buf);
}
2019-01-29 19:14:00 +01:00
continue;
2019-01-29 05:41:09 +01:00
}
}
Err(e) => return Err(e.into()),
}
}
}
}
// release SendRequest object
fn release<T: AsyncRead + AsyncWrite + 'static>(
io: SendRequest<Bytes>,
pool: Option<Acquired<T>>,
created: time::Instant,
close: bool,
) {
if let Some(mut pool) = pool {
if close {
pool.close(IoConnection::new(ConnectionType::H2(io), created, None));
} else {
pool.release(IoConnection::new(ConnectionType::H2(io), created, None));
}
}
}