From 5c4e4edda4e057e9dac379b01c5b183a4c70278d Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 1 Apr 2019 11:51:18 -0700 Subject: [PATCH] add ClientResponse::json() --- awc/CHANGES.md | 2 + awc/Cargo.toml | 2 +- awc/src/error.rs | 27 ++++++ awc/src/lib.rs | 2 +- awc/src/response.rs | 194 ++++++++++++++++++++++++++++++++++++++++++-- awc/src/test.rs | 12 ++- src/error.rs | 2 +- 7 files changed, 232 insertions(+), 9 deletions(-) diff --git a/awc/CHANGES.md b/awc/CHANGES.md index 9ca5b22d..c3359318 100644 --- a/awc/CHANGES.md +++ b/awc/CHANGES.md @@ -9,6 +9,8 @@ * Export `MessageBody` type +* `ClientResponse::json()` - Loads and parse `application/json` encoded body + ### Changed diff --git a/awc/Cargo.toml b/awc/Cargo.toml index ef9143ec..fdaf0a55 100644 --- a/awc/Cargo.toml +++ b/awc/Cargo.toml @@ -44,6 +44,7 @@ bytes = "0.4" derive_more = "0.14" futures = "0.1.25" log =" 0.4" +mime = "0.3" percent-encoding = "1.0" rand = "0.6" serde = "1.0" @@ -62,6 +63,5 @@ actix-server = { version = "0.4.1", features=["ssl"] } brotli2 = { version="0.3.2" } flate2 = { version="1.0.2" } env_logger = "0.6" -mime = "0.3" rand = "0.6" tokio-tcp = "0.1" \ No newline at end of file diff --git a/awc/src/error.rs b/awc/src/error.rs index 8f51fd7d..bbfd9b97 100644 --- a/awc/src/error.rs +++ b/awc/src/error.rs @@ -4,6 +4,9 @@ pub use actix_http::error::PayloadError; pub use actix_http::ws::HandshakeError as WsHandshakeError; pub use actix_http::ws::ProtocolError as WsProtocolError; +use actix_http::{Response, ResponseError}; +use serde_json::error::Error as JsonError; + use actix_http::http::{header::HeaderValue, Error as HttpError, StatusCode}; use derive_more::{Display, From}; @@ -47,3 +50,27 @@ impl From for WsClientError { WsClientError::SendRequest(err.into()) } } + +/// A set of errors that can occur during parsing json payloads +#[derive(Debug, Display, From)] +pub enum JsonPayloadError { + /// Payload size is bigger than allowed. (default: 32kB) + #[display(fmt = "Json payload size is bigger than allowed.")] + Overflow, + /// Content type error + #[display(fmt = "Content type error")] + ContentType, + /// Deserialize error + #[display(fmt = "Json deserialize error: {}", _0)] + Deserialize(JsonError), + /// Payload error + #[display(fmt = "Error that occur during reading payload: {}", _0)] + Payload(PayloadError), +} + +/// Return `InternlaServerError` for `JsonPayloadError` +impl ResponseError for JsonPayloadError { + fn error_response(&self) -> Response { + Response::new(StatusCode::INTERNAL_SERVER_ERROR) + } +} diff --git a/awc/src/lib.rs b/awc/src/lib.rs index e2c04dbb..8d0ac6a5 100644 --- a/awc/src/lib.rs +++ b/awc/src/lib.rs @@ -39,7 +39,7 @@ pub mod ws; pub use self::builder::ClientBuilder; pub use self::request::ClientRequest; -pub use self::response::{ClientResponse, MessageBody}; +pub use self::response::{ClientResponse, JsonBody, MessageBody}; use self::connect::{Connect, ConnectorWrapper}; diff --git a/awc/src/response.rs b/awc/src/response.rs index 2548d719..b9173520 100644 --- a/awc/src/response.rs +++ b/awc/src/response.rs @@ -4,13 +4,14 @@ use std::fmt; use bytes::{Bytes, BytesMut}; use futures::{Future, Poll, Stream}; -use actix_http::error::PayloadError; +use actix_http::cookie::Cookie; +use actix_http::error::{CookieParseError, PayloadError}; use actix_http::http::header::{CONTENT_LENGTH, SET_COOKIE}; use actix_http::http::{HeaderMap, StatusCode, Version}; use actix_http::{Extensions, HttpMessage, Payload, PayloadStream, ResponseHead}; +use serde::de::DeserializeOwned; -use actix_http::cookie::Cookie; -use actix_http::error::CookieParseError; +use crate::error::JsonPayloadError; /// Client Response pub struct ClientResponse { @@ -104,10 +105,21 @@ impl ClientResponse where S: Stream + 'static, { - /// Load http response's body. + /// Loads http response's body. pub fn body(&mut self) -> MessageBody { MessageBody::new(self) } + + /// Loads and parse `application/json` encoded body. + /// Return `JsonBody` future. It resolves to a `T` value. + /// + /// Returns error: + /// + /// * content type is not `application/json` + /// * content length is greater than 256k + pub fn json(&mut self) -> JsonBody { + JsonBody::new(self) + } } impl Stream for ClientResponse @@ -230,12 +242,115 @@ where } } +/// Response's payload json parser, it resolves to a deserialized `T` value. +/// +/// Returns error: +/// +/// * content type is not `application/json` +/// * content length is greater than 64k +pub struct JsonBody { + limit: usize, + length: Option, + stream: Payload, + err: Option, + fut: Option>>, +} + +impl JsonBody +where + S: Stream + 'static, + U: DeserializeOwned, +{ + /// Create `JsonBody` for request. + pub fn new(req: &mut ClientResponse) -> Self { + // check content-type + let json = if let Ok(Some(mime)) = req.mime_type() { + mime.subtype() == mime::JSON || mime.suffix() == Some(mime::JSON) + } else { + false + }; + if !json { + return JsonBody { + limit: 65536, + length: None, + stream: Payload::None, + fut: None, + err: Some(JsonPayloadError::ContentType), + }; + } + + let mut len = None; + if let Some(l) = req.headers().get(CONTENT_LENGTH) { + if let Ok(s) = l.to_str() { + if let Ok(l) = s.parse::() { + len = Some(l) + } + } + } + + JsonBody { + limit: 65536, + length: len, + stream: req.take_payload(), + fut: None, + err: None, + } + } + + /// Change max size of payload. By default max size is 64Kb + pub fn limit(mut self, limit: usize) -> Self { + self.limit = limit; + self + } +} + +impl Future for JsonBody +where + T: Stream + 'static, + U: DeserializeOwned + 'static, +{ + type Item = U; + type Error = JsonPayloadError; + + fn poll(&mut self) -> Poll { + if let Some(ref mut fut) = self.fut { + return fut.poll(); + } + + if let Some(err) = self.err.take() { + return Err(err); + } + + let limit = self.limit; + if let Some(len) = self.length.take() { + if len > limit { + return Err(JsonPayloadError::Overflow); + } + } + + let fut = std::mem::replace(&mut self.stream, Payload::None) + .from_err() + .fold(BytesMut::with_capacity(8192), move |mut body, chunk| { + if (body.len() + chunk.len()) > limit { + Err(JsonPayloadError::Overflow) + } else { + body.extend_from_slice(&chunk); + Ok(body) + } + }) + .and_then(|body| Ok(serde_json::from_slice::(&body)?)); + self.fut = Some(Box::new(fut)); + self.poll() + } +} + #[cfg(test)] mod tests { use super::*; use futures::Async; + use serde::{Deserialize, Serialize}; - use crate::{http::header, test::TestResponse}; + use crate::{http::header, test::block_on, test::TestResponse}; #[test] fn test_body() { @@ -268,4 +383,73 @@ mod tests { _ => unreachable!("error"), } } + + #[derive(Serialize, Deserialize, PartialEq, Debug)] + struct MyObject { + name: String, + } + + fn json_eq(err: JsonPayloadError, other: JsonPayloadError) -> bool { + match err { + JsonPayloadError::Overflow => match other { + JsonPayloadError::Overflow => true, + _ => false, + }, + JsonPayloadError::ContentType => match other { + JsonPayloadError::ContentType => true, + _ => false, + }, + _ => false, + } + } + + #[test] + fn test_json_body() { + let mut req = TestResponse::default().finish(); + let json = block_on(JsonBody::<_, MyObject>::new(&mut req)); + assert!(json_eq(json.err().unwrap(), JsonPayloadError::ContentType)); + + let mut req = TestResponse::default() + .header( + header::CONTENT_TYPE, + header::HeaderValue::from_static("application/text"), + ) + .finish(); + let json = block_on(JsonBody::<_, MyObject>::new(&mut req)); + assert!(json_eq(json.err().unwrap(), JsonPayloadError::ContentType)); + + let mut req = TestResponse::default() + .header( + header::CONTENT_TYPE, + header::HeaderValue::from_static("application/json"), + ) + .header( + header::CONTENT_LENGTH, + header::HeaderValue::from_static("10000"), + ) + .finish(); + + let json = block_on(JsonBody::<_, MyObject>::new(&mut req).limit(100)); + assert!(json_eq(json.err().unwrap(), JsonPayloadError::Overflow)); + + let mut req = TestResponse::default() + .header( + header::CONTENT_TYPE, + header::HeaderValue::from_static("application/json"), + ) + .header( + header::CONTENT_LENGTH, + header::HeaderValue::from_static("16"), + ) + .set_payload(Bytes::from_static(b"{\"name\": \"test\"}")) + .finish(); + + let json = block_on(JsonBody::<_, MyObject>::new(&mut req)); + assert_eq!( + json.ok().unwrap(), + MyObject { + name: "test".to_owned() + } + ); + } } diff --git a/awc/src/test.rs b/awc/src/test.rs index 5e595d15..1c772905 100644 --- a/awc/src/test.rs +++ b/awc/src/test.rs @@ -6,6 +6,8 @@ use actix_http::http::header::{self, Header, HeaderValue, IntoHeaderValue}; use actix_http::http::{HeaderName, HttpTryFrom, Version}; use actix_http::{h1, Payload, ResponseHead}; use bytes::Bytes; +#[cfg(test)] +use futures::Future; use percent_encoding::{percent_encode, USERINFO_ENCODE_SET}; use crate::ClientResponse; @@ -18,7 +20,7 @@ thread_local! { } #[cfg(test)] -pub fn run_on(f: F) -> R +pub(crate) fn run_on(f: F) -> R where F: Fn() -> R, { @@ -29,6 +31,14 @@ where .unwrap() } +#[cfg(test)] +pub(crate) fn block_on(f: F) -> Result +where + F: Future, +{ + RT.with(move |rt| rt.borrow_mut().block_on(f)) +} + /// Test `ClientResponse` builder pub struct TestResponse { head: ResponseHead, diff --git a/src/error.rs b/src/error.rs index 02e17241..78dc2fb6 100644 --- a/src/error.rs +++ b/src/error.rs @@ -79,7 +79,7 @@ pub enum JsonPayloadError { Payload(PayloadError), } -/// Return `BadRequest` for `UrlencodedError` +/// Return `BadRequest` for `JsonPayloadError` impl ResponseError for JsonPayloadError { fn error_response(&self) -> HttpResponse { match *self {