1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-25 00:01:48 +01:00
actix-extras/src/client/h2proto.rs

187 lines
6.3 KiB
Rust
Raw Normal View History

2019-01-28 20:41:09 -08:00
use std::time;
use actix_codec::{AsyncRead, AsyncWrite};
use bytes::Bytes;
use futures::future::{err, Either};
2019-02-06 11:44:15 -08:00
use futures::{Async, Future, Poll};
2019-01-28 20:41:09 -08:00
use h2::{client::SendRequest, SendStream};
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING};
2019-03-11 16:42:33 -07:00
use http::{request::Request, HttpTryFrom, Method, Version};
2019-02-06 11:44:15 -08:00
use crate::body::{BodyLength, MessageBody};
2019-02-13 13:52:11 -08:00
use crate::message::{Message, RequestHead, ResponseHead};
2019-03-11 16:42:33 -07:00
use crate::payload::Payload;
2019-01-28 20:41:09 -08:00
use super::connection::{ConnectionType, IoConnection};
use super::error::SendRequestError;
use super::pool::Acquired;
use super::response::ClientResponse;
pub(crate) fn send_request<T, B>(
io: SendRequest<Bytes>,
head: RequestHead,
body: B,
created: time::Instant,
pool: Option<Acquired<T>>,
) -> impl Future<Item = ClientResponse, Error = SendRequestError>
where
T: AsyncRead + AsyncWrite + 'static,
B: MessageBody,
{
trace!("Sending client request: {:?} {:?}", head, body.length());
2019-03-11 16:42:33 -07:00
let head_req = head.method == Method::HEAD;
2019-02-06 11:44:15 -08:00
let length = body.length();
let eof = match length {
2019-01-28 20:41:09 -08:00
BodyLength::None | BodyLength::Empty | BodyLength::Sized(0) => true,
_ => false,
};
io.ready()
.map_err(SendRequestError::from)
.and_then(move |mut io| {
let mut req = Request::new(());
*req.uri_mut() = head.uri;
*req.method_mut() = head.method;
*req.version_mut() = Version::HTTP_2;
2019-02-06 11:44:15 -08:00
let mut skip_len = true;
// let mut has_date = false;
2019-02-06 11:44:15 -08:00
// Content length
let _ = match length {
2019-02-18 20:24:50 -08:00
BodyLength::None => None,
2019-02-06 11:44:15 -08:00
BodyLength::Stream => {
skip_len = false;
None
}
BodyLength::Empty => req
.headers_mut()
.insert(CONTENT_LENGTH, HeaderValue::from_static("0")),
BodyLength::Sized(len) => req.headers_mut().insert(
CONTENT_LENGTH,
HeaderValue::try_from(format!("{}", len)).unwrap(),
),
BodyLength::Sized64(len) => req.headers_mut().insert(
CONTENT_LENGTH,
HeaderValue::try_from(format!("{}", len)).unwrap(),
),
};
// copy headers
for (key, value) in head.headers.iter() {
match *key {
CONNECTION | TRANSFER_ENCODING => continue, // http2 specific
CONTENT_LENGTH if skip_len => continue,
// DATE => has_date = true,
2019-02-06 11:44:15 -08:00
_ => (),
}
req.headers_mut().append(key, value.clone());
}
2019-01-28 20:41:09 -08:00
match io.send_request(req, eof) {
2019-02-06 11:44:15 -08:00
Ok((res, send)) => {
2019-01-28 20:41:09 -08:00
release(io, pool, created, false);
if !eof {
Either::A(Either::B(
SendBody {
body,
send,
buf: None,
}
2019-02-06 11:44:15 -08:00
.and_then(move |_| res.map_err(SendRequestError::from)),
2019-01-28 20:41:09 -08:00
))
} else {
2019-02-06 11:44:15 -08:00
Either::B(res.map_err(SendRequestError::from))
2019-01-28 20:41:09 -08:00
}
}
Err(e) => {
release(io, pool, created, e.is_io());
Either::A(Either::A(err(e.into())))
}
}
})
2019-03-11 16:42:33 -07:00
.and_then(move |resp| {
2019-01-28 20:41:09 -08:00
let (parts, body) = resp.into_parts();
2019-03-11 16:42:33 -07:00
let payload = if head_req { Payload::None } else { body.into() };
2019-01-28 20:41:09 -08:00
2019-02-13 13:52:11 -08:00
let mut head: Message<ResponseHead> = Message::new();
2019-01-28 20:41:09 -08:00
head.version = parts.version;
head.status = parts.status;
head.headers = parts.headers;
2019-03-11 16:42:33 -07:00
Ok(ClientResponse { head, payload })
2019-01-28 20:41:09 -08:00
})
.from_err()
}
struct SendBody<B: MessageBody> {
body: B,
send: SendStream<Bytes>,
buf: Option<Bytes>,
}
impl<B: MessageBody> Future for SendBody<B> {
type Item = ();
type Error = SendRequestError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
2019-01-29 10:14:00 -08:00
loop {
if self.buf.is_none() {
match self.body.poll_next() {
Ok(Async::Ready(Some(buf))) => {
self.send.reserve_capacity(buf.len());
self.buf = Some(buf);
}
Ok(Async::Ready(None)) => {
if let Err(e) = self.send.send_data(Bytes::new(), true) {
return Err(e.into());
}
self.send.reserve_capacity(0);
return Ok(Async::Ready(()));
2019-01-28 20:41:09 -08:00
}
2019-01-29 10:14:00 -08:00
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(e) => return Err(e.into()),
2019-01-28 20:41:09 -08:00
}
}
match self.send.poll_capacity() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
Ok(Async::Ready(Some(cap))) => {
let mut buf = self.buf.take().unwrap();
let len = buf.len();
let bytes = buf.split_to(std::cmp::min(cap, len));
if let Err(e) = self.send.send_data(bytes, false) {
return Err(e.into());
} else {
if !buf.is_empty() {
self.send.reserve_capacity(buf.len());
self.buf = Some(buf);
}
2019-01-29 10:14:00 -08:00
continue;
2019-01-28 20:41:09 -08:00
}
}
Err(e) => return Err(e.into()),
}
}
}
}
// release SendRequest object
fn release<T: AsyncRead + AsyncWrite + 'static>(
io: SendRequest<Bytes>,
pool: Option<Acquired<T>>,
created: time::Instant,
close: bool,
) {
if let Some(mut pool) = pool {
if close {
pool.close(IoConnection::new(ConnectionType::H2(io), created, None));
} else {
pool.release(IoConnection::new(ConnectionType::H2(io), created, None));
}
}
}