From cd83553db7a04f60d54f09ddc0e56c7e89a5fcc0 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 7 Feb 2019 11:06:05 -0800 Subject: [PATCH] simplify payload api; add missing http error helper functions --- Cargo.toml | 1 - examples/echo.rs | 4 +- examples/echo2.rs | 4 +- src/client/h1proto.rs | 21 +-- src/client/h2proto.rs | 3 +- src/client/request.rs | 19 +-- src/client/response.rs | 22 ++- src/error.rs | 314 +++++++++++++++++++++++++++++++++++++++- src/h1/dispatcher.rs | 2 +- src/h1/mod.rs | 2 + src/{ => h1}/payload.rs | 0 src/httpcodes.rs | 1 + src/httpmessage.rs | 137 ++++++++++-------- src/json.rs | 12 +- src/lib.rs | 2 - src/request.rs | 11 +- src/service/mod.rs | 2 + src/test.rs | 2 +- tests/test_client.rs | 4 +- tests/test_server.rs | 20 +-- tests/test_ws.rs | 34 ----- 21 files changed, 442 insertions(+), 175 deletions(-) rename src/{ => h1}/payload.rs (100%) diff --git a/Cargo.toml b/Cargo.toml index 8cc74446..23938f2f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,7 +75,6 @@ openssl = { version="0.10", optional = true } [dev-dependencies] actix-rt = "0.1.0" -actix-web = "0.7" actix-server = { version="0.2", features=["ssl"] } actix-connector = { version="0.2.0", features=["ssl"] } actix-utils = "0.2.1" diff --git a/examples/echo.rs b/examples/echo.rs index 3bfb04d7..03d5b470 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -18,8 +18,8 @@ fn main() { .client_timeout(1000) .client_disconnect(1000) .server_hostname("localhost") - .finish(|_req: Request| { - _req.body().limit(512).and_then(|bytes: Bytes| { + .finish(|mut req: Request| { + req.body().limit(512).and_then(|bytes: Bytes| { info!("request body: {:?}", bytes); let mut res = Response::Ok(); res.header("x-head", HeaderValue::from_static("dummy value!")); diff --git a/examples/echo2.rs b/examples/echo2.rs index 0e2bc9d5..2fd9cbcf 100644 --- a/examples/echo2.rs +++ b/examples/echo2.rs @@ -8,8 +8,8 @@ use futures::Future; use log::info; use std::env; -fn handle_request(_req: Request) -> impl Future { - _req.body().limit(512).from_err().and_then(|bytes: Bytes| { +fn handle_request(mut req: Request) -> impl Future { + req.body().limit(512).from_err().and_then(|bytes: Bytes| { info!("request body: {:?}", bytes); let mut res = Response::Ok(); res.header("x-head", HeaderValue::from_static("dummy value!")); diff --git a/src/client/h1proto.rs b/src/client/h1proto.rs index 86fc10b8..59a03ef4 100644 --- a/src/client/h1proto.rs +++ b/src/client/h1proto.rs @@ -50,14 +50,14 @@ where .into_future() .map_err(|(e, _)| SendRequestError::from(e)) .and_then(|(item, framed)| { - if let Some(res) = item { + if let Some(mut res) = item { match framed.get_codec().message_type() { h1::MessageType::None => { let force_close = !framed.get_codec().keepalive(); release_connection(framed, force_close) } _ => { - *res.payload.borrow_mut() = Some(Payload::stream(framed)) + res.set_payload(Payload::stream(framed)); } } ok(res) @@ -199,27 +199,10 @@ where } } -struct EmptyPayload; - -impl Stream for EmptyPayload { - type Item = Bytes; - type Error = PayloadError; - - fn poll(&mut self) -> Poll, Self::Error> { - Ok(Async::Ready(None)) - } -} - pub(crate) struct Payload { framed: Option>, } -impl Payload<()> { - pub fn empty() -> PayloadStream { - Box::new(EmptyPayload) - } -} - impl Payload { pub fn stream(framed: Framed) -> PayloadStream { Box::new(Payload { diff --git a/src/client/h2proto.rs b/src/client/h2proto.rs index ecd18cf8..f2f18d93 100644 --- a/src/client/h2proto.rs +++ b/src/client/h2proto.rs @@ -1,4 +1,3 @@ -use std::cell::RefCell; use std::time; use actix_codec::{AsyncRead, AsyncWrite}; @@ -111,7 +110,7 @@ where Ok(ClientResponse { head, - payload: RefCell::new(Some(Box::new(Payload::new(body)))), + payload: Some(Box::new(Payload::new(body))), }) }) .from_err() diff --git a/src/client/request.rs b/src/client/request.rs index b80f0e6d..7e971756 100644 --- a/src/client/request.rs +++ b/src/client/request.rs @@ -268,10 +268,9 @@ impl ClientRequestBuilder { /// /// ```rust /// # extern crate mime; - /// # extern crate actix_web; - /// # use actix_web::client::*; + /// # extern crate actix_http; /// # - /// use actix_web::{client, http}; + /// use actix_http::{client, http}; /// /// fn main() { /// let req = client::ClientRequest::build() @@ -299,16 +298,14 @@ impl ClientRequestBuilder { /// To override header use `set_header()` method. /// /// ```rust - /// # extern crate http; - /// # extern crate actix_web; - /// # use actix_web::client::*; + /// # extern crate actix_http; /// # - /// use http::header; + /// use actix_http::{client, http}; /// /// fn main() { - /// let req = ClientRequest::build() + /// let req = client::ClientRequest::build() /// .header("X-TEST", "value") - /// .header(header::CONTENT_TYPE, "application/json") + /// .header(http::header::CONTENT_TYPE, "application/json") /// .finish() /// .unwrap(); /// } @@ -427,8 +424,8 @@ impl ClientRequestBuilder { /// Set a cookie /// /// ```rust - /// # extern crate actix_web; - /// use actix_web::{client, http}; + /// # extern crate actix_http; + /// use actix_http::{client, http}; /// /// fn main() { /// let req = client::ClientRequest::build() diff --git a/src/client/response.rs b/src/client/response.rs index 6224d3cb..65c59f2a 100644 --- a/src/client/response.rs +++ b/src/client/response.rs @@ -1,4 +1,3 @@ -use std::cell::RefCell; use std::fmt; use bytes::Bytes; @@ -10,13 +9,11 @@ use crate::error::PayloadError; use crate::httpmessage::HttpMessage; use crate::message::{Head, ResponseHead}; -use super::h1proto::Payload; - /// Client Response #[derive(Default)] pub struct ClientResponse { pub(crate) head: ResponseHead, - pub(crate) payload: RefCell>, + pub(crate) payload: Option, } impl HttpMessage for ClientResponse { @@ -27,12 +24,8 @@ impl HttpMessage for ClientResponse { } #[inline] - fn payload(self) -> Self::Stream { - if let Some(payload) = self.payload.borrow_mut().take() { - payload - } else { - Payload::empty() - } + fn payload(&mut self) -> Option { + self.payload.take() } } @@ -41,7 +34,7 @@ impl ClientResponse { pub fn new() -> ClientResponse { ClientResponse { head: ResponseHead::default(), - payload: RefCell::new(None), + payload: None, } } @@ -84,6 +77,11 @@ impl ClientResponse { pub fn keep_alive(&self) -> bool { self.head().keep_alive() } + + /// Set response payload + pub fn set_payload(&mut self, payload: PayloadStream) { + self.payload = Some(payload); + } } impl Stream for ClientResponse { @@ -91,7 +89,7 @@ impl Stream for ClientResponse { type Error = PayloadError; fn poll(&mut self) -> Poll, Self::Error> { - if let Some(ref mut payload) = &mut *self.payload.borrow_mut() { + if let Some(ref mut payload) = self.payload { payload.poll() } else { Ok(Async::Ready(None)) diff --git a/src/error.rs b/src/error.rs index 03224b55..f71b429f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -344,7 +344,7 @@ pub enum PayloadError { UnknownLength, /// Http2 payload error #[display(fmt = "{}", _0)] - H2Payload(h2::Error), + Http2Payload(h2::Error), } impl From for PayloadError { @@ -642,6 +642,16 @@ where InternalError::new(err, StatusCode::UNAUTHORIZED).into() } +/// Helper function that creates wrapper of any error and generate +/// *PAYMENT_REQUIRED* response. +#[allow(non_snake_case)] +pub fn ErrorPaymentRequired(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::PAYMENT_REQUIRED).into() +} + /// Helper function that creates wrapper of any error and generate *FORBIDDEN* /// response. #[allow(non_snake_case)] @@ -672,6 +682,26 @@ where InternalError::new(err, StatusCode::METHOD_NOT_ALLOWED).into() } +/// Helper function that creates wrapper of any error and generate *NOT +/// ACCEPTABLE* response. +#[allow(non_snake_case)] +pub fn ErrorNotAcceptable(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::NOT_ACCEPTABLE).into() +} + +/// Helper function that creates wrapper of any error and generate *PROXY +/// AUTHENTICATION REQUIRED* response. +#[allow(non_snake_case)] +pub fn ErrorProxyAuthenticationRequired(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::PROXY_AUTHENTICATION_REQUIRED).into() +} + /// Helper function that creates wrapper of any error and generate *REQUEST /// TIMEOUT* response. #[allow(non_snake_case)] @@ -702,6 +732,116 @@ where InternalError::new(err, StatusCode::GONE).into() } +/// Helper function that creates wrapper of any error and generate *LENGTH +/// REQUIRED* response. +#[allow(non_snake_case)] +pub fn ErrorLengthRequired(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::LENGTH_REQUIRED).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *PAYLOAD TOO LARGE* response. +#[allow(non_snake_case)] +pub fn ErrorPayloadTooLarge(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::PAYLOAD_TOO_LARGE).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *URI TOO LONG* response. +#[allow(non_snake_case)] +pub fn ErrorUriTooLong(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::URI_TOO_LONG).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *UNSUPPORTED MEDIA TYPE* response. +#[allow(non_snake_case)] +pub fn ErrorUnsupportedMediaType(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::UNSUPPORTED_MEDIA_TYPE).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *RANGE NOT SATISFIABLE* response. +#[allow(non_snake_case)] +pub fn ErrorRangeNotSatisfiable(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::RANGE_NOT_SATISFIABLE).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *IM A TEAPOT* response. +#[allow(non_snake_case)] +pub fn ErrorImATeapot(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::IM_A_TEAPOT).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *MISDIRECTED REQUEST* response. +#[allow(non_snake_case)] +pub fn ErrorMisdirectedRequest(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::MISDIRECTED_REQUEST).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *UNPROCESSABLE ENTITY* response. +#[allow(non_snake_case)] +pub fn ErrorUnprocessableEntity(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::UNPROCESSABLE_ENTITY).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *LOCKED* response. +#[allow(non_snake_case)] +pub fn ErrorLocked(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::LOCKED).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *FAILED DEPENDENCY* response. +#[allow(non_snake_case)] +pub fn ErrorFailedDependency(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::FAILED_DEPENDENCY).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *UPGRADE REQUIRED* response. +#[allow(non_snake_case)] +pub fn ErrorUpgradeRequired(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::UPGRADE_REQUIRED).into() +} + /// Helper function that creates wrapper of any error and generate /// *PRECONDITION FAILED* response. #[allow(non_snake_case)] @@ -712,6 +852,46 @@ where InternalError::new(err, StatusCode::PRECONDITION_FAILED).into() } +/// Helper function that creates wrapper of any error and generate +/// *PRECONDITION REQUIRED* response. +#[allow(non_snake_case)] +pub fn ErrorPreconditionRequired(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::PRECONDITION_REQUIRED).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *TOO MANY REQUESTS* response. +#[allow(non_snake_case)] +pub fn ErrorTooManyRequests(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::TOO_MANY_REQUESTS).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *REQUEST HEADER FIELDS TOO LARGE* response. +#[allow(non_snake_case)] +pub fn ErrorRequestHeaderFieldsTooLarge(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *UNAVAILABLE FOR LEGAL REASONS* response. +#[allow(non_snake_case)] +pub fn ErrorUnavailableForLegalReasons(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS).into() +} + /// Helper function that creates wrapper of any error and generate /// *EXPECTATION FAILED* response. #[allow(non_snake_case)] @@ -772,6 +952,66 @@ where InternalError::new(err, StatusCode::GATEWAY_TIMEOUT).into() } +/// Helper function that creates wrapper of any error and +/// generate *HTTP VERSION NOT SUPPORTED* response. +#[allow(non_snake_case)] +pub fn ErrorHttpVersionNotSupported(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::HTTP_VERSION_NOT_SUPPORTED).into() +} + +/// Helper function that creates wrapper of any error and +/// generate *VARIANT ALSO NEGOTIATES* response. +#[allow(non_snake_case)] +pub fn ErrorVariantAlsoNegotiates(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::VARIANT_ALSO_NEGOTIATES).into() +} + +/// Helper function that creates wrapper of any error and +/// generate *INSUFFICIENT STORAGE* response. +#[allow(non_snake_case)] +pub fn ErrorInsufficientStorage(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::INSUFFICIENT_STORAGE).into() +} + +/// Helper function that creates wrapper of any error and +/// generate *LOOP DETECTED* response. +#[allow(non_snake_case)] +pub fn ErrorLoopDetected(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::LOOP_DETECTED).into() +} + +/// Helper function that creates wrapper of any error and +/// generate *NOT EXTENDED* response. +#[allow(non_snake_case)] +pub fn ErrorNotExtended(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::NOT_EXTENDED).into() +} + +/// Helper function that creates wrapper of any error and +/// generate *NETWORK AUTHENTICATION REQUIRED* response. +#[allow(non_snake_case)] +pub fn ErrorNetworkAuthenticationRequired(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::NETWORK_AUTHENTICATION_REQUIRED).into() +} + #[cfg(test)] mod tests { use super::*; @@ -926,6 +1166,9 @@ mod tests { let r: Response = ErrorUnauthorized("err").into(); assert_eq!(r.status(), StatusCode::UNAUTHORIZED); + let r: Response = ErrorPaymentRequired("err").into(); + assert_eq!(r.status(), StatusCode::PAYMENT_REQUIRED); + let r: Response = ErrorForbidden("err").into(); assert_eq!(r.status(), StatusCode::FORBIDDEN); @@ -935,6 +1178,12 @@ mod tests { let r: Response = ErrorMethodNotAllowed("err").into(); assert_eq!(r.status(), StatusCode::METHOD_NOT_ALLOWED); + let r: Response = ErrorNotAcceptable("err").into(); + assert_eq!(r.status(), StatusCode::NOT_ACCEPTABLE); + + let r: Response = ErrorProxyAuthenticationRequired("err").into(); + assert_eq!(r.status(), StatusCode::PROXY_AUTHENTICATION_REQUIRED); + let r: Response = ErrorRequestTimeout("err").into(); assert_eq!(r.status(), StatusCode::REQUEST_TIMEOUT); @@ -944,12 +1193,57 @@ mod tests { let r: Response = ErrorGone("err").into(); assert_eq!(r.status(), StatusCode::GONE); + let r: Response = ErrorLengthRequired("err").into(); + assert_eq!(r.status(), StatusCode::LENGTH_REQUIRED); + let r: Response = ErrorPreconditionFailed("err").into(); assert_eq!(r.status(), StatusCode::PRECONDITION_FAILED); + let r: Response = ErrorPayloadTooLarge("err").into(); + assert_eq!(r.status(), StatusCode::PAYLOAD_TOO_LARGE); + + let r: Response = ErrorUriTooLong("err").into(); + assert_eq!(r.status(), StatusCode::URI_TOO_LONG); + + let r: Response = ErrorUnsupportedMediaType("err").into(); + assert_eq!(r.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE); + + let r: Response = ErrorRangeNotSatisfiable("err").into(); + assert_eq!(r.status(), StatusCode::RANGE_NOT_SATISFIABLE); + let r: Response = ErrorExpectationFailed("err").into(); assert_eq!(r.status(), StatusCode::EXPECTATION_FAILED); + let r: Response = ErrorImATeapot("err").into(); + assert_eq!(r.status(), StatusCode::IM_A_TEAPOT); + + let r: Response = ErrorMisdirectedRequest("err").into(); + assert_eq!(r.status(), StatusCode::MISDIRECTED_REQUEST); + + let r: Response = ErrorUnprocessableEntity("err").into(); + assert_eq!(r.status(), StatusCode::UNPROCESSABLE_ENTITY); + + let r: Response = ErrorLocked("err").into(); + assert_eq!(r.status(), StatusCode::LOCKED); + + let r: Response = ErrorFailedDependency("err").into(); + assert_eq!(r.status(), StatusCode::FAILED_DEPENDENCY); + + let r: Response = ErrorUpgradeRequired("err").into(); + assert_eq!(r.status(), StatusCode::UPGRADE_REQUIRED); + + let r: Response = ErrorPreconditionRequired("err").into(); + assert_eq!(r.status(), StatusCode::PRECONDITION_REQUIRED); + + let r: Response = ErrorTooManyRequests("err").into(); + assert_eq!(r.status(), StatusCode::TOO_MANY_REQUESTS); + + let r: Response = ErrorRequestHeaderFieldsTooLarge("err").into(); + assert_eq!(r.status(), StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE); + + let r: Response = ErrorUnavailableForLegalReasons("err").into(); + assert_eq!(r.status(), StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS); + let r: Response = ErrorInternalServerError("err").into(); assert_eq!(r.status(), StatusCode::INTERNAL_SERVER_ERROR); @@ -964,5 +1258,23 @@ mod tests { let r: Response = ErrorGatewayTimeout("err").into(); assert_eq!(r.status(), StatusCode::GATEWAY_TIMEOUT); + + let r: Response = ErrorHttpVersionNotSupported("err").into(); + assert_eq!(r.status(), StatusCode::HTTP_VERSION_NOT_SUPPORTED); + + let r: Response = ErrorVariantAlsoNegotiates("err").into(); + assert_eq!(r.status(), StatusCode::VARIANT_ALSO_NEGOTIATES); + + let r: Response = ErrorInsufficientStorage("err").into(); + assert_eq!(r.status(), StatusCode::INSUFFICIENT_STORAGE); + + let r: Response = ErrorLoopDetected("err").into(); + assert_eq!(r.status(), StatusCode::LOOP_DETECTED); + + let r: Response = ErrorNotExtended("err").into(); + assert_eq!(r.status(), StatusCode::NOT_EXTENDED); + + let r: Response = ErrorNetworkAuthenticationRequired("err").into(); + assert_eq!(r.status(), StatusCode::NETWORK_AUTHENTICATION_REQUIRED); } } diff --git a/src/h1/dispatcher.rs b/src/h1/dispatcher.rs index 1295dfdd..0a0ed04e 100644 --- a/src/h1/dispatcher.rs +++ b/src/h1/dispatcher.rs @@ -14,11 +14,11 @@ use crate::body::{Body, BodyLength, MessageBody, ResponseBody}; use crate::config::ServiceConfig; use crate::error::DispatchError; use crate::error::{ParseError, PayloadError}; -use crate::payload::{Payload, PayloadSender, PayloadStatus, PayloadWriter}; use crate::request::Request; use crate::response::Response; use super::codec::Codec; +use super::payload::{Payload, PayloadSender, PayloadStatus, PayloadWriter}; use super::{H1ServiceResult, Message, MessageType}; const MAX_PIPELINED_MESSAGES: usize = 16; diff --git a/src/h1/mod.rs b/src/h1/mod.rs index a375b60d..9054c266 100644 --- a/src/h1/mod.rs +++ b/src/h1/mod.rs @@ -9,11 +9,13 @@ mod codec; mod decoder; mod dispatcher; mod encoder; +mod payload; mod service; pub use self::client::{ClientCodec, ClientPayloadCodec}; pub use self::codec::Codec; pub use self::dispatcher::Dispatcher; +pub use self::payload::{Payload, PayloadBuffer}; pub use self::service::{H1Service, H1ServiceHandler, OneRequest}; use crate::request::Request; diff --git a/src/payload.rs b/src/h1/payload.rs similarity index 100% rename from src/payload.rs rename to src/h1/payload.rs diff --git a/src/httpcodes.rs b/src/httpcodes.rs index 7806bd80..5dfeefa9 100644 --- a/src/httpcodes.rs +++ b/src/httpcodes.rs @@ -54,6 +54,7 @@ impl Response { STATIC_RESP!(Gone, StatusCode::GONE); STATIC_RESP!(LengthRequired, StatusCode::LENGTH_REQUIRED); STATIC_RESP!(PreconditionFailed, StatusCode::PRECONDITION_FAILED); + STATIC_RESP!(PreconditionRequired, StatusCode::PRECONDITION_REQUIRED); STATIC_RESP!(PayloadTooLarge, StatusCode::PAYLOAD_TOO_LARGE); STATIC_RESP!(UriTooLong, StatusCode::URI_TOO_LONG); STATIC_RESP!(UnsupportedMediaType, StatusCode::UNSUPPORTED_MEDIA_TYPE); diff --git a/src/httpmessage.rs b/src/httpmessage.rs index c50de2e9..39aa1b68 100644 --- a/src/httpmessage.rs +++ b/src/httpmessage.rs @@ -25,7 +25,7 @@ pub trait HttpMessage: Sized { fn headers(&self) -> &HeaderMap; /// Message payload stream - fn payload(self) -> Self::Stream; + fn payload(&mut self) -> Option; #[doc(hidden)] /// Get a header @@ -128,7 +128,7 @@ pub trait HttpMessage: Sized { /// } /// # fn main() {} /// ``` - fn body(self) -> MessageBody { + fn body(&mut self) -> MessageBody { MessageBody::new(self) } @@ -162,7 +162,7 @@ pub trait HttpMessage: Sized { /// } /// # fn main() {} /// ``` - fn urlencoded(self) -> UrlEncoded { + fn urlencoded(&mut self) -> UrlEncoded { UrlEncoded::new(self) } @@ -198,19 +198,19 @@ pub trait HttpMessage: Sized { /// } /// # fn main() {} /// ``` - fn json(self) -> JsonBody { + fn json(&mut self) -> JsonBody { JsonBody::new(self) } /// Return stream of lines. - fn readlines(self) -> Readlines { + fn readlines(&mut self) -> Readlines { Readlines::new(self) } } /// Stream to read request line by line. pub struct Readlines { - stream: T::Stream, + stream: Option, buff: BytesMut, limit: usize, checked_buff: bool, @@ -220,7 +220,7 @@ pub struct Readlines { impl Readlines { /// Create a new stream to read request line by line. - fn new(req: T) -> Self { + fn new(req: &mut T) -> Self { let encoding = match req.encoding() { Ok(enc) => enc, Err(err) => return Self::err(req, err.into()), @@ -242,7 +242,7 @@ impl Readlines { self } - fn err(req: T, err: ReadlinesError) -> Self { + fn err(req: &mut T, err: ReadlinesError) -> Self { Readlines { stream: req.payload(), buff: BytesMut::new(), @@ -292,61 +292,65 @@ impl Stream for Readlines { self.checked_buff = true; } // poll req for more bytes - match self.stream.poll() { - Ok(Async::Ready(Some(mut bytes))) => { - // check if there is a newline in bytes - let mut found: Option = None; - for (ind, b) in bytes.iter().enumerate() { - if *b == b'\n' { - found = Some(ind); - break; + if let Some(ref mut stream) = self.stream { + match stream.poll() { + Ok(Async::Ready(Some(mut bytes))) => { + // check if there is a newline in bytes + let mut found: Option = None; + for (ind, b) in bytes.iter().enumerate() { + if *b == b'\n' { + found = Some(ind); + break; + } } + if let Some(ind) = found { + // check if line is longer than limit + if ind + 1 > self.limit { + return Err(ReadlinesError::LimitOverflow); + } + let enc: *const Encoding = self.encoding as *const Encoding; + let line = if enc == UTF_8 { + str::from_utf8(&bytes.split_to(ind + 1)) + .map_err(|_| ReadlinesError::EncodingError)? + .to_owned() + } else { + self.encoding + .decode(&bytes.split_to(ind + 1), DecoderTrap::Strict) + .map_err(|_| ReadlinesError::EncodingError)? + }; + // extend buffer with rest of the bytes; + self.buff.extend_from_slice(&bytes); + self.checked_buff = false; + return Ok(Async::Ready(Some(line))); + } + self.buff.extend_from_slice(&bytes); + Ok(Async::NotReady) } - if let Some(ind) = found { - // check if line is longer than limit - if ind + 1 > self.limit { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(None)) => { + if self.buff.is_empty() { + return Ok(Async::Ready(None)); + } + if self.buff.len() > self.limit { return Err(ReadlinesError::LimitOverflow); } let enc: *const Encoding = self.encoding as *const Encoding; let line = if enc == UTF_8 { - str::from_utf8(&bytes.split_to(ind + 1)) + str::from_utf8(&self.buff) .map_err(|_| ReadlinesError::EncodingError)? .to_owned() } else { self.encoding - .decode(&bytes.split_to(ind + 1), DecoderTrap::Strict) + .decode(&self.buff, DecoderTrap::Strict) .map_err(|_| ReadlinesError::EncodingError)? }; - // extend buffer with rest of the bytes; - self.buff.extend_from_slice(&bytes); - self.checked_buff = false; - return Ok(Async::Ready(Some(line))); + self.buff.clear(); + Ok(Async::Ready(Some(line))) } - self.buff.extend_from_slice(&bytes); - Ok(Async::NotReady) + Err(e) => Err(ReadlinesError::from(e)), } - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(None)) => { - if self.buff.is_empty() { - return Ok(Async::Ready(None)); - } - if self.buff.len() > self.limit { - return Err(ReadlinesError::LimitOverflow); - } - let enc: *const Encoding = self.encoding as *const Encoding; - let line = if enc == UTF_8 { - str::from_utf8(&self.buff) - .map_err(|_| ReadlinesError::EncodingError)? - .to_owned() - } else { - self.encoding - .decode(&self.buff, DecoderTrap::Strict) - .map_err(|_| ReadlinesError::EncodingError)? - }; - self.buff.clear(); - Ok(Async::Ready(Some(line))) - } - Err(e) => Err(ReadlinesError::from(e)), + } else { + Ok(Async::Ready(None)) } } } @@ -362,7 +366,7 @@ pub struct MessageBody { impl MessageBody { /// Create `MessageBody` for request. - pub fn new(req: T) -> MessageBody { + pub fn new(req: &mut T) -> MessageBody { let mut len = None; if let Some(l) = req.headers().get(header::CONTENT_LENGTH) { if let Ok(s) = l.to_str() { @@ -379,7 +383,7 @@ impl MessageBody { MessageBody { limit: 262_144, length: len, - stream: Some(req.payload()), + stream: req.payload(), fut: None, err: None, } @@ -424,6 +428,10 @@ where } } + if self.stream.is_none() { + return Ok(Async::Ready(Bytes::new())); + } + // future let limit = self.limit; self.fut = Some(Box::new( @@ -457,7 +465,7 @@ pub struct UrlEncoded { impl UrlEncoded { /// Create a new future to URL encode a request - pub fn new(req: T) -> UrlEncoded { + pub fn new(req: &mut T) -> UrlEncoded { // check content type if req.content_type().to_lowercase() != "application/x-www-form-urlencoded" { return Self::err(UrlencodedError::ContentType); @@ -482,7 +490,7 @@ impl UrlEncoded { UrlEncoded { encoding, - stream: Some(req.payload()), + stream: req.payload(), limit: 262_144, length: len, fut: None, @@ -694,7 +702,7 @@ mod tests { #[test] fn test_urlencoded_error() { - let req = TestRequest::with_header( + let mut req = TestRequest::with_header( header::CONTENT_TYPE, "application/x-www-form-urlencoded", ) @@ -705,7 +713,7 @@ mod tests { UrlencodedError::UnknownLength ); - let req = TestRequest::with_header( + let mut req = TestRequest::with_header( header::CONTENT_TYPE, "application/x-www-form-urlencoded", ) @@ -716,7 +724,7 @@ mod tests { UrlencodedError::Overflow ); - let req = TestRequest::with_header(header::CONTENT_TYPE, "text/plain") + let mut req = TestRequest::with_header(header::CONTENT_TYPE, "text/plain") .header(header::CONTENT_LENGTH, "10") .finish(); assert_eq!( @@ -727,7 +735,7 @@ mod tests { #[test] fn test_urlencoded() { - let req = TestRequest::with_header( + let mut req = TestRequest::with_header( header::CONTENT_TYPE, "application/x-www-form-urlencoded", ) @@ -743,7 +751,7 @@ mod tests { }) ); - let req = TestRequest::with_header( + let mut req = TestRequest::with_header( header::CONTENT_TYPE, "application/x-www-form-urlencoded; charset=utf-8", ) @@ -762,19 +770,20 @@ mod tests { #[test] fn test_message_body() { - let req = TestRequest::with_header(header::CONTENT_LENGTH, "xxxx").finish(); + let mut req = TestRequest::with_header(header::CONTENT_LENGTH, "xxxx").finish(); match req.body().poll().err().unwrap() { PayloadError::UnknownLength => (), _ => unreachable!("error"), } - let req = TestRequest::with_header(header::CONTENT_LENGTH, "1000000").finish(); + let mut req = + TestRequest::with_header(header::CONTENT_LENGTH, "1000000").finish(); match req.body().poll().err().unwrap() { PayloadError::Overflow => (), _ => unreachable!("error"), } - let req = TestRequest::default() + let mut req = TestRequest::default() .set_payload(Bytes::from_static(b"test")) .finish(); match req.body().poll().ok().unwrap() { @@ -782,7 +791,7 @@ mod tests { _ => unreachable!("error"), } - let req = TestRequest::default() + let mut req = TestRequest::default() .set_payload(Bytes::from_static(b"11111111111111")) .finish(); match req.body().limit(5).poll().err().unwrap() { @@ -793,14 +802,14 @@ mod tests { #[test] fn test_readlines() { - let req = TestRequest::default() + let mut req = TestRequest::default() .set_payload(Bytes::from_static( b"Lorem Ipsum is simply dummy text of the printing and typesetting\n\ industry. Lorem Ipsum has been the industry's standard dummy\n\ Contrary to popular belief, Lorem Ipsum is not simply random text.", )) .finish(); - let mut r = Readlines::new(req); + let mut r = Readlines::new(&mut req); match r.poll().ok().unwrap() { Async::Ready(Some(s)) => assert_eq!( s, diff --git a/src/json.rs b/src/json.rs index fc1ab4d2..6cd1e87a 100644 --- a/src/json.rs +++ b/src/json.rs @@ -50,7 +50,7 @@ pub struct JsonBody { impl JsonBody { /// Create `JsonBody` for request. - pub fn new(req: T) -> Self { + pub fn new(req: &mut T) -> Self { // check content-type let json = if let Ok(Some(mime)) = req.mime_type() { mime.subtype() == mime::JSON || mime.suffix() == Some(mime::JSON) @@ -79,7 +79,7 @@ impl JsonBody { JsonBody { limit: 262_144, length: len, - stream: Some(req.payload()), + stream: req.payload(), fut: None, err: None, } @@ -164,11 +164,11 @@ mod tests { #[test] fn test_json_body() { - let req = TestRequest::default().finish(); + let mut req = TestRequest::default().finish(); let mut json = req.json::(); assert_eq!(json.poll().err().unwrap(), JsonPayloadError::ContentType); - let req = TestRequest::default() + let mut req = TestRequest::default() .header( header::CONTENT_TYPE, header::HeaderValue::from_static("application/text"), @@ -177,7 +177,7 @@ mod tests { let mut json = req.json::(); assert_eq!(json.poll().err().unwrap(), JsonPayloadError::ContentType); - let req = TestRequest::default() + let mut req = TestRequest::default() .header( header::CONTENT_TYPE, header::HeaderValue::from_static("application/json"), @@ -190,7 +190,7 @@ mod tests { let mut json = req.json::().limit(100); assert_eq!(json.poll().err().unwrap(), JsonPayloadError::Overflow); - let req = TestRequest::default() + let mut req = TestRequest::default() .header( header::CONTENT_TYPE, header::HeaderValue::from_static("application/json"), diff --git a/src/lib.rs b/src/lib.rs index cdb4f038..a3ca52ed 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,7 +78,6 @@ mod httpcodes; mod httpmessage; mod json; mod message; -mod payload; mod request; mod response; mod service; @@ -111,7 +110,6 @@ pub mod dev { pub use crate::httpmessage::{MessageBody, Readlines, UrlEncoded}; pub use crate::json::JsonBody; - pub use crate::payload::{Payload, PayloadBuffer}; pub use crate::response::ResponseBuilder; } diff --git a/src/request.rs b/src/request.rs index b60a772e..4df95d45 100644 --- a/src/request.rs +++ b/src/request.rs @@ -10,7 +10,8 @@ use crate::error::PayloadError; use crate::extensions::Extensions; use crate::httpmessage::HttpMessage; use crate::message::{Message, MessagePool, RequestHead}; -use crate::payload::Payload; + +use crate::h1::Payload; /// Request pub struct Request

{ @@ -29,8 +30,8 @@ where } #[inline] - fn payload(mut self) -> P { - self.payload.take().unwrap() + fn payload(&mut self) -> Option

{ + self.payload.take() } } @@ -62,9 +63,9 @@ impl Request { } /// Take request's payload - pub fn take_payload(mut self) -> (Payload, Request<()>) { + pub fn take_payload(mut self) -> (Option, Request<()>) { ( - self.payload.take().unwrap(), + self.payload.take(), Request { payload: Some(()), inner: self.inner.clone(), diff --git a/src/service/mod.rs b/src/service/mod.rs index 83a40bd1..3939ab99 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -1,3 +1,5 @@ +use h2::RecvStream; + mod senderror; pub use self::senderror::{SendError, SendResponse}; diff --git a/src/test.rs b/src/test.rs index 852dd3c0..c68bbf8e 100644 --- a/src/test.rs +++ b/src/test.rs @@ -6,8 +6,8 @@ use cookie::Cookie; use http::header::HeaderName; use http::{HeaderMap, HttpTryFrom, Method, Uri, Version}; +use crate::h1::Payload; use crate::header::{Header, IntoHeaderValue}; -use crate::payload::Payload; use crate::Request; /// Test `Request` builder diff --git a/tests/test_client.rs b/tests/test_client.rs index 606bac22..6f502b0a 100644 --- a/tests/test_client.rs +++ b/tests/test_client.rs @@ -47,7 +47,7 @@ fn test_h1_v2() { assert!(repr.contains("ClientRequest")); assert!(repr.contains("x-test")); - let response = srv.block_on(request.send(&mut connector)).unwrap(); + let mut response = srv.block_on(request.send(&mut connector)).unwrap(); assert!(response.status().is_success()); // read response @@ -55,7 +55,7 @@ fn test_h1_v2() { assert_eq!(bytes, Bytes::from_static(STR.as_ref())); let request = srv.post().finish().unwrap(); - let response = srv.block_on(request.send(&mut connector)).unwrap(); + let mut response = srv.block_on(request.send(&mut connector)).unwrap(); assert!(response.status().is_success()); // read response diff --git a/tests/test_server.rs b/tests/test_server.rs index 9fa27e71..53db3840 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -89,7 +89,7 @@ fn test_h2_body() -> std::io::Result<()> { .map_err(|e| println!("Openssl error: {}", e)) .and_then( h2::H2Service::build() - .finish(|req: Request<_>| { + .finish(|mut req: Request<_>| { req.body() .limit(1024 * 1024) .and_then(|body| Ok(Response::Ok().body(body))) @@ -101,7 +101,7 @@ fn test_h2_body() -> std::io::Result<()> { let req = client::ClientRequest::get(srv.surl("/")) .body(data.clone()) .unwrap(); - let response = srv.send_request(req).unwrap(); + let mut response = srv.send_request(req).unwrap(); assert!(response.status().is_success()); let body = srv.block_on(response.body().limit(1024 * 1024)).unwrap(); @@ -350,7 +350,7 @@ fn test_headers() { let req = srv.get().finish().unwrap(); - let response = srv.block_on(req.send(&mut connector)).unwrap(); + let mut response = srv.block_on(req.send(&mut connector)).unwrap(); assert!(response.status().is_success()); // read response @@ -387,7 +387,7 @@ fn test_body() { }); let req = srv.get().finish().unwrap(); - let response = srv.send_request(req).unwrap(); + let mut response = srv.send_request(req).unwrap(); assert!(response.status().is_success()); // read response @@ -402,7 +402,7 @@ fn test_head_empty() { }); let req = client::ClientRequest::head(srv.url("/")).finish().unwrap(); - let response = srv.send_request(req).unwrap(); + let mut response = srv.send_request(req).unwrap(); assert!(response.status().is_success()); { @@ -428,7 +428,7 @@ fn test_head_binary() { }); let req = client::ClientRequest::head(srv.url("/")).finish().unwrap(); - let response = srv.send_request(req).unwrap(); + let mut response = srv.send_request(req).unwrap(); assert!(response.status().is_success()); { @@ -477,7 +477,7 @@ fn test_body_length() { }); let req = srv.get().finish().unwrap(); - let response = srv.send_request(req).unwrap(); + let mut response = srv.send_request(req).unwrap(); assert!(response.status().is_success()); // read response @@ -496,7 +496,7 @@ fn test_body_chunked_explicit() { }); let req = srv.get().finish().unwrap(); - let response = srv.send_request(req).unwrap(); + let mut response = srv.send_request(req).unwrap(); assert!(response.status().is_success()); // read response @@ -517,7 +517,7 @@ fn test_body_chunked_implicit() { }); let req = srv.get().finish().unwrap(); - let response = srv.send_request(req).unwrap(); + let mut response = srv.send_request(req).unwrap(); assert!(response.status().is_success()); // read response @@ -540,7 +540,7 @@ fn test_response_http_error_handling() { }); let req = srv.get().finish().unwrap(); - let response = srv.send_request(req).unwrap(); + let mut response = srv.send_request(req).unwrap(); assert_eq!(response.status(), http::StatusCode::INTERNAL_SERVER_ERROR); // read response diff --git a/tests/test_ws.rs b/tests/test_ws.rs index 07f857d7..bf5a3c41 100644 --- a/tests/test_ws.rs +++ b/tests/test_ws.rs @@ -5,7 +5,6 @@ use actix_http_test::TestServer; use actix_service::NewService; use actix_utils::framed::IntoFramed; use actix_utils::stream::TakeItem; -use actix_web::ws as web_ws; use bytes::{Bytes, BytesMut}; use futures::future::{lazy, ok, Either}; use futures::{Future, Sink, Stream}; @@ -105,37 +104,4 @@ fn test_simple() { item, Some(ws::Frame::Close(Some(ws::CloseCode::Normal.into()))) ); - - { - let mut sys = actix_web::actix::System::new("test"); - let url = srv.url("/"); - - let (reader, mut writer) = sys - .block_on(lazy(|| web_ws::Client::new(url).connect())) - .unwrap(); - - writer.text("text"); - let (item, reader) = sys.block_on(reader.into_future()).unwrap(); - assert_eq!(item, Some(web_ws::Message::Text("text".to_owned()))); - - writer.binary(b"text".as_ref()); - let (item, reader) = sys.block_on(reader.into_future()).unwrap(); - assert_eq!( - item, - Some(web_ws::Message::Binary(Bytes::from_static(b"text").into())) - ); - - writer.ping("ping"); - let (item, reader) = sys.block_on(reader.into_future()).unwrap(); - assert_eq!(item, Some(web_ws::Message::Pong("ping".to_owned()))); - - writer.close(Some(web_ws::CloseCode::Normal.into())); - let (item, _) = sys.block_on(reader.into_future()).unwrap(); - assert_eq!( - item, - Some(web_ws::Message::Close(Some( - web_ws::CloseCode::Normal.into() - ))) - ); - } }