1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-01-31 19:10:07 +01:00

185 lines
5.6 KiB
Rust
Raw Normal View History

2019-01-28 20:41:09 -08:00
use std::time;
use actix_codec::{AsyncRead, AsyncWrite};
use bytes::Bytes;
2019-11-26 11:25:50 +06:00
use futures::future::poll_fn;
2019-01-28 20:41:09 -08:00
use h2::{client::SendRequest, SendStream};
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING};
2019-03-11 16:42:33 -07:00
use http::{request::Request, HttpTryFrom, Method, Version};
2019-02-06 11:44:15 -08:00
2019-03-27 09:24:55 -07:00
use crate::body::{BodySize, MessageBody};
2019-09-12 21:52:46 +06:00
use crate::header::HeaderMap;
use crate::message::{RequestHeadType, ResponseHead};
2019-03-11 16:42:33 -07:00
use crate::payload::Payload;
2019-01-28 20:41:09 -08:00
use super::connection::{ConnectionType, IoConnection};
use super::error::SendRequestError;
use super::pool::Acquired;
2019-11-18 18:42:27 +06:00
pub(crate) async fn send_request<T, B>(
mut io: SendRequest<Bytes>,
head: RequestHeadType,
2019-01-28 20:41:09 -08:00
body: B,
created: time::Instant,
pool: Option<Acquired<T>>,
2019-11-18 18:42:27 +06:00
) -> Result<(ResponseHead, Payload), SendRequestError>
2019-01-28 20:41:09 -08:00
where
2019-11-18 18:42:27 +06:00
T: AsyncRead + AsyncWrite + Unpin + 'static,
2019-01-28 20:41:09 -08:00
B: MessageBody,
{
trace!("Sending client request: {:?} {:?}", head, body.size());
let head_req = head.as_ref().method == Method::HEAD;
let length = body.size();
2019-02-06 11:44:15 -08:00
let eof = match length {
2019-03-27 09:24:55 -07:00
BodySize::None | BodySize::Empty | BodySize::Sized(0) => true,
2019-01-28 20:41:09 -08:00
_ => false,
};
2019-11-18 18:42:27 +06:00
let mut req = Request::new(());
*req.uri_mut() = head.as_ref().uri.clone();
*req.method_mut() = head.as_ref().method.clone();
*req.version_mut() = Version::HTTP_2;
2019-02-06 11:44:15 -08:00
2019-11-18 18:42:27 +06:00
let mut skip_len = true;
// let mut has_date = false;
// Content length
let _ = match length {
BodySize::None => None,
BodySize::Stream => {
skip_len = false;
None
}
BodySize::Empty => req
.headers_mut()
.insert(CONTENT_LENGTH, HeaderValue::from_static("0")),
BodySize::Sized(len) => req.headers_mut().insert(
CONTENT_LENGTH,
HeaderValue::try_from(format!("{}", len)).unwrap(),
),
BodySize::Sized64(len) => req.headers_mut().insert(
CONTENT_LENGTH,
HeaderValue::try_from(format!("{}", len)).unwrap(),
),
};
// Extracting extra headers from RequestHeadType. HeaderMap::new() does not allocate.
let (head, extra_headers) = match head {
RequestHeadType::Owned(head) => (RequestHeadType::Owned(head), HeaderMap::new()),
RequestHeadType::Rc(head, extra_headers) => (
RequestHeadType::Rc(head, None),
extra_headers.unwrap_or_else(HeaderMap::new),
),
};
// merging headers from head and extra headers.
let headers = head
.as_ref()
.headers
.iter()
.filter(|(name, _)| !extra_headers.contains_key(*name))
.chain(extra_headers.iter());
// copy headers
for (key, value) in headers {
match *key {
CONNECTION | TRANSFER_ENCODING => continue, // http2 specific
CONTENT_LENGTH if skip_len => continue,
// DATE => has_date = true,
_ => (),
}
req.headers_mut().append(key, value.clone());
}
let res = poll_fn(|cx| io.poll_ready(cx)).await;
if let Err(e) = res {
release(io, pool, created, e.is_io());
return Err(SendRequestError::from(e));
}
let resp = match io.send_request(req, eof) {
Ok((fut, send)) => {
release(io, pool, created, false);
if !eof {
send_body(body, send).await?;
2019-01-28 20:41:09 -08:00
}
2019-11-18 18:42:27 +06:00
fut.await.map_err(SendRequestError::from)?
}
Err(e) => {
release(io, pool, created, e.is_io());
return Err(e.into());
}
};
2019-01-28 20:41:09 -08:00
2019-11-18 18:42:27 +06:00
let (parts, body) = resp.into_parts();
let payload = if head_req { Payload::None } else { body.into() };
let mut head = ResponseHead::new(parts.status);
head.version = parts.version;
head.headers = parts.headers.into();
Ok((head, payload))
2019-01-28 20:41:09 -08:00
}
2019-11-18 18:42:27 +06:00
async fn send_body<B: MessageBody>(
mut body: B,
mut send: SendStream<Bytes>,
) -> Result<(), SendRequestError> {
let mut buf = None;
loop {
if buf.is_none() {
match poll_fn(|cx| body.poll_next(cx)).await {
Some(Ok(b)) => {
send.reserve_capacity(b.len());
buf = Some(b);
}
Some(Err(e)) => return Err(e.into()),
None => {
if let Err(e) = send.send_data(Bytes::new(), true) {
return Err(e.into());
2019-01-28 20:41:09 -08:00
}
2019-11-18 18:42:27 +06:00
send.reserve_capacity(0);
return Ok(());
2019-01-28 20:41:09 -08:00
}
}
2019-11-18 18:42:27 +06:00
}
2019-01-28 20:41:09 -08:00
2019-11-18 18:42:27 +06:00
match poll_fn(|cx| send.poll_capacity(cx)).await {
None => return Ok(()),
Some(Ok(cap)) => {
let b = buf.as_mut().unwrap();
let len = b.len();
let bytes = b.split_to(std::cmp::min(cap, len));
if let Err(e) = send.send_data(bytes, false) {
return Err(e.into());
} else {
if !b.is_empty() {
send.reserve_capacity(b.len());
2019-11-19 19:38:42 +06:00
} else {
buf = None;
2019-01-28 20:41:09 -08:00
}
2019-11-18 18:42:27 +06:00
continue;
2019-01-28 20:41:09 -08:00
}
}
2019-11-18 18:42:27 +06:00
Some(Err(e)) => return Err(e.into()),
2019-01-28 20:41:09 -08:00
}
}
}
// release SendRequest object
2019-11-18 18:42:27 +06:00
fn release<T: AsyncRead + AsyncWrite + Unpin + 'static>(
2019-01-28 20:41:09 -08:00
io: SendRequest<Bytes>,
pool: Option<Acquired<T>>,
created: time::Instant,
close: bool,
) {
if let Some(mut pool) = pool {
if close {
pool.close(IoConnection::new(ConnectionType::H2(io), created, None));
} else {
pool.release(IoConnection::new(ConnectionType::H2(io), created, None));
}
}
}