1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-24 15:51:47 +01:00
actix-extras/src/client/parser.rs

207 lines
6.9 KiB
Rust
Raw Normal View History

2018-03-03 12:17:26 -08:00
use bytes::{Bytes, BytesMut};
2018-04-13 16:02:01 -07:00
use futures::{Async, Poll};
use http::header::{self, HeaderName, HeaderValue};
use http::{HeaderMap, HttpTryFrom, StatusCode, Version};
use httparse;
use std::mem;
2018-01-27 22:03:03 -08:00
use error::{ParseError, PayloadError};
2018-04-13 16:02:01 -07:00
use server::h1::{chunked, Decoder};
2018-01-27 22:03:03 -08:00
use server::{utils, IoStream};
use super::ClientResponse;
use super::response::ClientMessage;
2018-01-27 22:03:03 -08:00
const MAX_BUFFER_SIZE: usize = 131_072;
const MAX_HEADERS: usize = 96;
#[derive(Default)]
2018-01-27 22:03:03 -08:00
pub struct HttpResponseParser {
2018-02-19 03:11:11 -08:00
decoder: Option<Decoder>,
2018-01-27 22:03:03 -08:00
}
2018-02-19 13:18:18 -08:00
#[derive(Debug, Fail)]
2018-01-27 22:03:03 -08:00
pub enum HttpResponseParserError {
2018-02-19 13:18:18 -08:00
/// Server disconnected
2018-04-13 16:02:01 -07:00
#[fail(display = "Server disconnected")]
2018-01-27 22:03:03 -08:00
Disconnect,
2018-04-13 16:02:01 -07:00
#[fail(display = "{}", _0)]
2018-02-19 13:18:18 -08:00
Error(#[cause] ParseError),
2018-01-27 22:03:03 -08:00
}
impl HttpResponseParser {
2018-04-13 16:02:01 -07:00
pub fn parse<T>(
&mut self, io: &mut T, buf: &mut BytesMut
) -> Poll<ClientResponse, HttpResponseParserError>
where
T: IoStream,
2018-01-27 22:03:03 -08:00
{
// if buf is empty parse_message will always return NotReady, let's avoid that
2018-03-03 12:17:26 -08:00
if buf.is_empty() {
2018-01-27 22:03:03 -08:00
match utils::read_from_io(io, buf) {
2018-04-13 16:02:01 -07:00
Ok(Async::Ready(0)) => return Err(HttpResponseParserError::Disconnect),
2018-01-27 22:03:03 -08:00
Ok(Async::Ready(_)) => (),
2018-04-13 16:02:01 -07:00
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => return Err(HttpResponseParserError::Error(err.into())),
2018-01-27 22:03:03 -08:00
}
2018-03-03 12:17:26 -08:00
}
2018-01-27 22:03:03 -08:00
loop {
2018-03-03 12:17:26 -08:00
match HttpResponseParser::parse_message(buf)
.map_err(HttpResponseParserError::Error)?
{
2018-01-27 22:03:03 -08:00
Async::Ready((msg, decoder)) => {
2018-02-19 03:11:11 -08:00
self.decoder = decoder;
2018-01-27 22:03:03 -08:00
return Ok(Async::Ready(msg));
2018-04-13 16:02:01 -07:00
}
2018-01-27 22:03:03 -08:00
Async::NotReady => {
if buf.capacity() >= MAX_BUFFER_SIZE {
return Err(HttpResponseParserError::Error(ParseError::TooLarge));
}
2018-03-03 12:17:26 -08:00
match utils::read_from_io(io, buf) {
2018-04-13 16:02:01 -07:00
Ok(Async::Ready(0)) => {
return Err(HttpResponseParserError::Disconnect)
}
2018-03-03 12:17:26 -08:00
Ok(Async::Ready(_)) => (),
Ok(Async::NotReady) => return Ok(Async::NotReady),
2018-04-13 16:02:01 -07:00
Err(err) => {
return Err(HttpResponseParserError::Error(err.into()))
}
2018-01-27 22:03:03 -08:00
}
2018-04-13 16:02:01 -07:00
}
2018-01-27 22:03:03 -08:00
}
}
}
2018-04-13 16:02:01 -07:00
pub fn parse_payload<T>(
&mut self, io: &mut T, buf: &mut BytesMut
) -> Poll<Option<Bytes>, PayloadError>
where
T: IoStream,
2018-02-19 03:11:11 -08:00
{
2018-02-24 07:29:35 +03:00
if self.decoder.is_some() {
2018-03-03 12:17:26 -08:00
loop {
// read payload
let (not_ready, stream_finished) = match utils::read_from_io(io, buf) {
Ok(Async::Ready(0)) => (false, true),
2018-03-03 12:17:26 -08:00
Err(err) => return Err(err.into()),
Ok(Async::NotReady) => (true, false),
_ => (false, false),
2018-03-03 12:17:26 -08:00
};
2018-02-24 07:29:35 +03:00
2018-03-03 12:17:26 -08:00
match self.decoder.as_mut().unwrap().decode(buf) {
2018-04-13 16:02:01 -07:00
Ok(Async::Ready(Some(b))) => return Ok(Async::Ready(Some(b))),
2018-03-03 12:17:26 -08:00
Ok(Async::Ready(None)) => {
self.decoder.take();
2018-04-13 16:02:01 -07:00
return Ok(Async::Ready(None));
2018-03-03 12:17:26 -08:00
}
Ok(Async::NotReady) => {
if not_ready {
2018-04-13 16:02:01 -07:00
return Ok(Async::NotReady);
2018-03-03 12:17:26 -08:00
}
if stream_finished {
2018-04-13 16:02:01 -07:00
return Err(PayloadError::Incomplete);
}
2018-03-03 12:17:26 -08:00
}
Err(err) => return Err(err.into()),
2018-02-24 07:29:35 +03:00
}
}
2018-01-29 15:45:37 -08:00
} else {
2018-02-19 03:11:11 -08:00
Ok(Async::Ready(None))
2018-01-29 15:45:37 -08:00
}
}
2018-04-13 16:02:01 -07:00
fn parse_message(
buf: &mut BytesMut
) -> Poll<(ClientResponse, Option<Decoder>), ParseError> {
2018-01-27 22:03:03 -08:00
// Parse http message
let bytes_ptr = buf.as_ref().as_ptr() as usize;
let mut headers: [httparse::Header; MAX_HEADERS] =
2018-04-13 16:02:01 -07:00
unsafe { mem::uninitialized() };
2018-01-27 22:03:03 -08:00
let (len, version, status, headers_len) = {
2018-04-13 16:02:01 -07:00
let b = unsafe {
let b: &[u8] = buf;
mem::transmute(b)
};
2018-01-27 22:03:03 -08:00
let mut resp = httparse::Response::new(&mut headers);
match resp.parse(b)? {
httparse::Status::Complete(len) => {
let version = if resp.version.unwrap_or(1) == 1 {
2018-01-27 22:03:03 -08:00
Version::HTTP_11
} else {
Version::HTTP_10
};
let status = StatusCode::from_u16(resp.code.unwrap())
.map_err(|_| ParseError::Status)?;
(len, version, status, resp.headers.len())
}
httparse::Status::Partial => return Ok(Async::NotReady),
}
};
let slice = buf.split_to(len).freeze();
// convert headers
let mut hdrs = HeaderMap::new();
for header in headers[..headers_len].iter() {
if let Ok(name) = HeaderName::try_from(header.name) {
2018-01-27 22:03:03 -08:00
let v_start = header.value.as_ptr() as usize - bytes_ptr;
let v_end = v_start + header.value.len();
let value = unsafe {
2018-04-13 16:02:01 -07:00
HeaderValue::from_shared_unchecked(slice.slice(v_start, v_end))
};
2018-01-27 22:03:03 -08:00
hdrs.append(name, value);
} else {
2018-04-13 16:02:01 -07:00
return Err(ParseError::Header);
2018-01-27 22:03:03 -08:00
}
}
2018-02-26 13:58:23 -08:00
let decoder = if status == StatusCode::SWITCHING_PROTOCOLS {
Some(Decoder::eof())
} else if let Some(len) = hdrs.get(header::CONTENT_LENGTH) {
2018-01-27 22:03:03 -08:00
// Content-Length
if let Ok(s) = len.to_str() {
if let Ok(len) = s.parse::<u64>() {
Some(Decoder::length(len))
} else {
debug!("illegal Content-Length: {:?}", len);
2018-04-13 16:02:01 -07:00
return Err(ParseError::Header);
2018-01-27 22:03:03 -08:00
}
} else {
debug!("illegal Content-Length: {:?}", len);
2018-04-13 16:02:01 -07:00
return Err(ParseError::Header);
2018-01-27 22:03:03 -08:00
}
} else if chunked(&hdrs)? {
// Chunked encoding
Some(Decoder::chunked())
} else {
None
};
if let Some(decoder) = decoder {
2018-04-13 16:02:01 -07:00
Ok(Async::Ready((
ClientResponse::new(ClientMessage {
status,
version,
headers: hdrs,
cookies: None,
}),
Some(decoder),
)))
2018-01-27 22:03:03 -08:00
} else {
2018-04-13 16:02:01 -07:00
Ok(Async::Ready((
ClientResponse::new(ClientMessage {
status,
version,
headers: hdrs,
cookies: None,
}),
None,
)))
2018-01-27 22:03:03 -08:00
}
}
}