diff --git a/Cargo.toml b/Cargo.toml index 8cc74446e..23938f2fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,7 +75,6 @@ openssl = { version="0.10", optional = true } [dev-dependencies] actix-rt = "0.1.0" -actix-web = "0.7" actix-server = { version="0.2", features=["ssl"] } actix-connector = { version="0.2.0", features=["ssl"] } actix-utils = "0.2.1" diff --git a/examples/echo.rs b/examples/echo.rs index 3bfb04d7c..03d5b4706 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -18,8 +18,8 @@ fn main() { .client_timeout(1000) .client_disconnect(1000) .server_hostname("localhost") - .finish(|_req: Request| { - _req.body().limit(512).and_then(|bytes: Bytes| { + .finish(|mut req: Request| { + req.body().limit(512).and_then(|bytes: Bytes| { info!("request body: {:?}", bytes); let mut res = Response::Ok(); res.header("x-head", HeaderValue::from_static("dummy value!")); diff --git a/examples/echo2.rs b/examples/echo2.rs index 0e2bc9d52..2fd9cbcff 100644 --- a/examples/echo2.rs +++ b/examples/echo2.rs @@ -8,8 +8,8 @@ use futures::Future; use log::info; use std::env; -fn handle_request(_req: Request) -> impl Future { - _req.body().limit(512).from_err().and_then(|bytes: Bytes| { +fn handle_request(mut req: Request) -> impl Future { + req.body().limit(512).from_err().and_then(|bytes: Bytes| { info!("request body: {:?}", bytes); let mut res = Response::Ok(); res.header("x-head", HeaderValue::from_static("dummy value!")); diff --git a/src/client/h1proto.rs b/src/client/h1proto.rs index 86fc10b84..59a03ef48 100644 --- a/src/client/h1proto.rs +++ b/src/client/h1proto.rs @@ -50,14 +50,14 @@ where .into_future() .map_err(|(e, _)| SendRequestError::from(e)) .and_then(|(item, framed)| { - if let Some(res) = item { + if let Some(mut res) = item { match framed.get_codec().message_type() { h1::MessageType::None => { let force_close = !framed.get_codec().keepalive(); release_connection(framed, force_close) } _ => { - *res.payload.borrow_mut() = Some(Payload::stream(framed)) + res.set_payload(Payload::stream(framed)); } } ok(res) @@ -199,27 +199,10 @@ where } } -struct EmptyPayload; - -impl Stream for EmptyPayload { - type Item = Bytes; - type Error = PayloadError; - - fn poll(&mut self) -> Poll, Self::Error> { - Ok(Async::Ready(None)) - } -} - pub(crate) struct Payload { framed: Option>, } -impl Payload<()> { - pub fn empty() -> PayloadStream { - Box::new(EmptyPayload) - } -} - impl Payload { pub fn stream(framed: Framed) -> PayloadStream { Box::new(Payload { diff --git a/src/client/h2proto.rs b/src/client/h2proto.rs index ecd18cf82..f2f18d935 100644 --- a/src/client/h2proto.rs +++ b/src/client/h2proto.rs @@ -1,4 +1,3 @@ -use std::cell::RefCell; use std::time; use actix_codec::{AsyncRead, AsyncWrite}; @@ -111,7 +110,7 @@ where Ok(ClientResponse { head, - payload: RefCell::new(Some(Box::new(Payload::new(body)))), + payload: Some(Box::new(Payload::new(body))), }) }) .from_err() diff --git a/src/client/request.rs b/src/client/request.rs index b80f0e6dc..7e971756d 100644 --- a/src/client/request.rs +++ b/src/client/request.rs @@ -268,10 +268,9 @@ impl ClientRequestBuilder { /// /// ```rust /// # extern crate mime; - /// # extern crate actix_web; - /// # use actix_web::client::*; + /// # extern crate actix_http; /// # - /// use actix_web::{client, http}; + /// use actix_http::{client, http}; /// /// fn main() { /// let req = client::ClientRequest::build() @@ -299,16 +298,14 @@ impl ClientRequestBuilder { /// To override header use `set_header()` method. /// /// ```rust - /// # extern crate http; - /// # extern crate actix_web; - /// # use actix_web::client::*; + /// # extern crate actix_http; /// # - /// use http::header; + /// use actix_http::{client, http}; /// /// fn main() { - /// let req = ClientRequest::build() + /// let req = client::ClientRequest::build() /// .header("X-TEST", "value") - /// .header(header::CONTENT_TYPE, "application/json") + /// .header(http::header::CONTENT_TYPE, "application/json") /// .finish() /// .unwrap(); /// } @@ -427,8 +424,8 @@ impl ClientRequestBuilder { /// Set a cookie /// /// ```rust - /// # extern crate actix_web; - /// use actix_web::{client, http}; + /// # extern crate actix_http; + /// use actix_http::{client, http}; /// /// fn main() { /// let req = client::ClientRequest::build() diff --git a/src/client/response.rs b/src/client/response.rs index 6224d3cb5..65c59f2a6 100644 --- a/src/client/response.rs +++ b/src/client/response.rs @@ -1,4 +1,3 @@ -use std::cell::RefCell; use std::fmt; use bytes::Bytes; @@ -10,13 +9,11 @@ use crate::error::PayloadError; use crate::httpmessage::HttpMessage; use crate::message::{Head, ResponseHead}; -use super::h1proto::Payload; - /// Client Response #[derive(Default)] pub struct ClientResponse { pub(crate) head: ResponseHead, - pub(crate) payload: RefCell>, + pub(crate) payload: Option, } impl HttpMessage for ClientResponse { @@ -27,12 +24,8 @@ impl HttpMessage for ClientResponse { } #[inline] - fn payload(self) -> Self::Stream { - if let Some(payload) = self.payload.borrow_mut().take() { - payload - } else { - Payload::empty() - } + fn payload(&mut self) -> Option { + self.payload.take() } } @@ -41,7 +34,7 @@ impl ClientResponse { pub fn new() -> ClientResponse { ClientResponse { head: ResponseHead::default(), - payload: RefCell::new(None), + payload: None, } } @@ -84,6 +77,11 @@ impl ClientResponse { pub fn keep_alive(&self) -> bool { self.head().keep_alive() } + + /// Set response payload + pub fn set_payload(&mut self, payload: PayloadStream) { + self.payload = Some(payload); + } } impl Stream for ClientResponse { @@ -91,7 +89,7 @@ impl Stream for ClientResponse { type Error = PayloadError; fn poll(&mut self) -> Poll, Self::Error> { - if let Some(ref mut payload) = &mut *self.payload.borrow_mut() { + if let Some(ref mut payload) = self.payload { payload.poll() } else { Ok(Async::Ready(None)) diff --git a/src/error.rs b/src/error.rs index 03224b558..f71b429fd 100644 --- a/src/error.rs +++ b/src/error.rs @@ -344,7 +344,7 @@ pub enum PayloadError { UnknownLength, /// Http2 payload error #[display(fmt = "{}", _0)] - H2Payload(h2::Error), + Http2Payload(h2::Error), } impl From for PayloadError { @@ -642,6 +642,16 @@ where InternalError::new(err, StatusCode::UNAUTHORIZED).into() } +/// Helper function that creates wrapper of any error and generate +/// *PAYMENT_REQUIRED* response. +#[allow(non_snake_case)] +pub fn ErrorPaymentRequired(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::PAYMENT_REQUIRED).into() +} + /// Helper function that creates wrapper of any error and generate *FORBIDDEN* /// response. #[allow(non_snake_case)] @@ -672,6 +682,26 @@ where InternalError::new(err, StatusCode::METHOD_NOT_ALLOWED).into() } +/// Helper function that creates wrapper of any error and generate *NOT +/// ACCEPTABLE* response. +#[allow(non_snake_case)] +pub fn ErrorNotAcceptable(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::NOT_ACCEPTABLE).into() +} + +/// Helper function that creates wrapper of any error and generate *PROXY +/// AUTHENTICATION REQUIRED* response. +#[allow(non_snake_case)] +pub fn ErrorProxyAuthenticationRequired(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::PROXY_AUTHENTICATION_REQUIRED).into() +} + /// Helper function that creates wrapper of any error and generate *REQUEST /// TIMEOUT* response. #[allow(non_snake_case)] @@ -702,6 +732,116 @@ where InternalError::new(err, StatusCode::GONE).into() } +/// Helper function that creates wrapper of any error and generate *LENGTH +/// REQUIRED* response. +#[allow(non_snake_case)] +pub fn ErrorLengthRequired(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::LENGTH_REQUIRED).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *PAYLOAD TOO LARGE* response. +#[allow(non_snake_case)] +pub fn ErrorPayloadTooLarge(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::PAYLOAD_TOO_LARGE).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *URI TOO LONG* response. +#[allow(non_snake_case)] +pub fn ErrorUriTooLong(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::URI_TOO_LONG).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *UNSUPPORTED MEDIA TYPE* response. +#[allow(non_snake_case)] +pub fn ErrorUnsupportedMediaType(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::UNSUPPORTED_MEDIA_TYPE).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *RANGE NOT SATISFIABLE* response. +#[allow(non_snake_case)] +pub fn ErrorRangeNotSatisfiable(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::RANGE_NOT_SATISFIABLE).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *IM A TEAPOT* response. +#[allow(non_snake_case)] +pub fn ErrorImATeapot(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::IM_A_TEAPOT).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *MISDIRECTED REQUEST* response. +#[allow(non_snake_case)] +pub fn ErrorMisdirectedRequest(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::MISDIRECTED_REQUEST).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *UNPROCESSABLE ENTITY* response. +#[allow(non_snake_case)] +pub fn ErrorUnprocessableEntity(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::UNPROCESSABLE_ENTITY).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *LOCKED* response. +#[allow(non_snake_case)] +pub fn ErrorLocked(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::LOCKED).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *FAILED DEPENDENCY* response. +#[allow(non_snake_case)] +pub fn ErrorFailedDependency(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::FAILED_DEPENDENCY).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *UPGRADE REQUIRED* response. +#[allow(non_snake_case)] +pub fn ErrorUpgradeRequired(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::UPGRADE_REQUIRED).into() +} + /// Helper function that creates wrapper of any error and generate /// *PRECONDITION FAILED* response. #[allow(non_snake_case)] @@ -712,6 +852,46 @@ where InternalError::new(err, StatusCode::PRECONDITION_FAILED).into() } +/// Helper function that creates wrapper of any error and generate +/// *PRECONDITION REQUIRED* response. +#[allow(non_snake_case)] +pub fn ErrorPreconditionRequired(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::PRECONDITION_REQUIRED).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *TOO MANY REQUESTS* response. +#[allow(non_snake_case)] +pub fn ErrorTooManyRequests(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::TOO_MANY_REQUESTS).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *REQUEST HEADER FIELDS TOO LARGE* response. +#[allow(non_snake_case)] +pub fn ErrorRequestHeaderFieldsTooLarge(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE).into() +} + +/// Helper function that creates wrapper of any error and generate +/// *UNAVAILABLE FOR LEGAL REASONS* response. +#[allow(non_snake_case)] +pub fn ErrorUnavailableForLegalReasons(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS).into() +} + /// Helper function that creates wrapper of any error and generate /// *EXPECTATION FAILED* response. #[allow(non_snake_case)] @@ -772,6 +952,66 @@ where InternalError::new(err, StatusCode::GATEWAY_TIMEOUT).into() } +/// Helper function that creates wrapper of any error and +/// generate *HTTP VERSION NOT SUPPORTED* response. +#[allow(non_snake_case)] +pub fn ErrorHttpVersionNotSupported(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::HTTP_VERSION_NOT_SUPPORTED).into() +} + +/// Helper function that creates wrapper of any error and +/// generate *VARIANT ALSO NEGOTIATES* response. +#[allow(non_snake_case)] +pub fn ErrorVariantAlsoNegotiates(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::VARIANT_ALSO_NEGOTIATES).into() +} + +/// Helper function that creates wrapper of any error and +/// generate *INSUFFICIENT STORAGE* response. +#[allow(non_snake_case)] +pub fn ErrorInsufficientStorage(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::INSUFFICIENT_STORAGE).into() +} + +/// Helper function that creates wrapper of any error and +/// generate *LOOP DETECTED* response. +#[allow(non_snake_case)] +pub fn ErrorLoopDetected(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::LOOP_DETECTED).into() +} + +/// Helper function that creates wrapper of any error and +/// generate *NOT EXTENDED* response. +#[allow(non_snake_case)] +pub fn ErrorNotExtended(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::NOT_EXTENDED).into() +} + +/// Helper function that creates wrapper of any error and +/// generate *NETWORK AUTHENTICATION REQUIRED* response. +#[allow(non_snake_case)] +pub fn ErrorNetworkAuthenticationRequired(err: T) -> Error +where + T: fmt::Debug + fmt::Display + 'static, +{ + InternalError::new(err, StatusCode::NETWORK_AUTHENTICATION_REQUIRED).into() +} + #[cfg(test)] mod tests { use super::*; @@ -926,6 +1166,9 @@ mod tests { let r: Response = ErrorUnauthorized("err").into(); assert_eq!(r.status(), StatusCode::UNAUTHORIZED); + let r: Response = ErrorPaymentRequired("err").into(); + assert_eq!(r.status(), StatusCode::PAYMENT_REQUIRED); + let r: Response = ErrorForbidden("err").into(); assert_eq!(r.status(), StatusCode::FORBIDDEN); @@ -935,6 +1178,12 @@ mod tests { let r: Response = ErrorMethodNotAllowed("err").into(); assert_eq!(r.status(), StatusCode::METHOD_NOT_ALLOWED); + let r: Response = ErrorNotAcceptable("err").into(); + assert_eq!(r.status(), StatusCode::NOT_ACCEPTABLE); + + let r: Response = ErrorProxyAuthenticationRequired("err").into(); + assert_eq!(r.status(), StatusCode::PROXY_AUTHENTICATION_REQUIRED); + let r: Response = ErrorRequestTimeout("err").into(); assert_eq!(r.status(), StatusCode::REQUEST_TIMEOUT); @@ -944,12 +1193,57 @@ mod tests { let r: Response = ErrorGone("err").into(); assert_eq!(r.status(), StatusCode::GONE); + let r: Response = ErrorLengthRequired("err").into(); + assert_eq!(r.status(), StatusCode::LENGTH_REQUIRED); + let r: Response = ErrorPreconditionFailed("err").into(); assert_eq!(r.status(), StatusCode::PRECONDITION_FAILED); + let r: Response = ErrorPayloadTooLarge("err").into(); + assert_eq!(r.status(), StatusCode::PAYLOAD_TOO_LARGE); + + let r: Response = ErrorUriTooLong("err").into(); + assert_eq!(r.status(), StatusCode::URI_TOO_LONG); + + let r: Response = ErrorUnsupportedMediaType("err").into(); + assert_eq!(r.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE); + + let r: Response = ErrorRangeNotSatisfiable("err").into(); + assert_eq!(r.status(), StatusCode::RANGE_NOT_SATISFIABLE); + let r: Response = ErrorExpectationFailed("err").into(); assert_eq!(r.status(), StatusCode::EXPECTATION_FAILED); + let r: Response = ErrorImATeapot("err").into(); + assert_eq!(r.status(), StatusCode::IM_A_TEAPOT); + + let r: Response = ErrorMisdirectedRequest("err").into(); + assert_eq!(r.status(), StatusCode::MISDIRECTED_REQUEST); + + let r: Response = ErrorUnprocessableEntity("err").into(); + assert_eq!(r.status(), StatusCode::UNPROCESSABLE_ENTITY); + + let r: Response = ErrorLocked("err").into(); + assert_eq!(r.status(), StatusCode::LOCKED); + + let r: Response = ErrorFailedDependency("err").into(); + assert_eq!(r.status(), StatusCode::FAILED_DEPENDENCY); + + let r: Response = ErrorUpgradeRequired("err").into(); + assert_eq!(r.status(), StatusCode::UPGRADE_REQUIRED); + + let r: Response = ErrorPreconditionRequired("err").into(); + assert_eq!(r.status(), StatusCode::PRECONDITION_REQUIRED); + + let r: Response = ErrorTooManyRequests("err").into(); + assert_eq!(r.status(), StatusCode::TOO_MANY_REQUESTS); + + let r: Response = ErrorRequestHeaderFieldsTooLarge("err").into(); + assert_eq!(r.status(), StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE); + + let r: Response = ErrorUnavailableForLegalReasons("err").into(); + assert_eq!(r.status(), StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS); + let r: Response = ErrorInternalServerError("err").into(); assert_eq!(r.status(), StatusCode::INTERNAL_SERVER_ERROR); @@ -964,5 +1258,23 @@ mod tests { let r: Response = ErrorGatewayTimeout("err").into(); assert_eq!(r.status(), StatusCode::GATEWAY_TIMEOUT); + + let r: Response = ErrorHttpVersionNotSupported("err").into(); + assert_eq!(r.status(), StatusCode::HTTP_VERSION_NOT_SUPPORTED); + + let r: Response = ErrorVariantAlsoNegotiates("err").into(); + assert_eq!(r.status(), StatusCode::VARIANT_ALSO_NEGOTIATES); + + let r: Response = ErrorInsufficientStorage("err").into(); + assert_eq!(r.status(), StatusCode::INSUFFICIENT_STORAGE); + + let r: Response = ErrorLoopDetected("err").into(); + assert_eq!(r.status(), StatusCode::LOOP_DETECTED); + + let r: Response = ErrorNotExtended("err").into(); + assert_eq!(r.status(), StatusCode::NOT_EXTENDED); + + let r: Response = ErrorNetworkAuthenticationRequired("err").into(); + assert_eq!(r.status(), StatusCode::NETWORK_AUTHENTICATION_REQUIRED); } } diff --git a/src/h1/dispatcher.rs b/src/h1/dispatcher.rs index 1295dfddf..0a0ed04e2 100644 --- a/src/h1/dispatcher.rs +++ b/src/h1/dispatcher.rs @@ -14,11 +14,11 @@ use crate::body::{Body, BodyLength, MessageBody, ResponseBody}; use crate::config::ServiceConfig; use crate::error::DispatchError; use crate::error::{ParseError, PayloadError}; -use crate::payload::{Payload, PayloadSender, PayloadStatus, PayloadWriter}; use crate::request::Request; use crate::response::Response; use super::codec::Codec; +use super::payload::{Payload, PayloadSender, PayloadStatus, PayloadWriter}; use super::{H1ServiceResult, Message, MessageType}; const MAX_PIPELINED_MESSAGES: usize = 16; diff --git a/src/h1/mod.rs b/src/h1/mod.rs index a375b60d3..9054c2665 100644 --- a/src/h1/mod.rs +++ b/src/h1/mod.rs @@ -9,11 +9,13 @@ mod codec; mod decoder; mod dispatcher; mod encoder; +mod payload; mod service; pub use self::client::{ClientCodec, ClientPayloadCodec}; pub use self::codec::Codec; pub use self::dispatcher::Dispatcher; +pub use self::payload::{Payload, PayloadBuffer}; pub use self::service::{H1Service, H1ServiceHandler, OneRequest}; use crate::request::Request; diff --git a/src/payload.rs b/src/h1/payload.rs similarity index 100% rename from src/payload.rs rename to src/h1/payload.rs diff --git a/src/httpcodes.rs b/src/httpcodes.rs index 7806bd806..5dfeefa9e 100644 --- a/src/httpcodes.rs +++ b/src/httpcodes.rs @@ -54,6 +54,7 @@ impl Response { STATIC_RESP!(Gone, StatusCode::GONE); STATIC_RESP!(LengthRequired, StatusCode::LENGTH_REQUIRED); STATIC_RESP!(PreconditionFailed, StatusCode::PRECONDITION_FAILED); + STATIC_RESP!(PreconditionRequired, StatusCode::PRECONDITION_REQUIRED); STATIC_RESP!(PayloadTooLarge, StatusCode::PAYLOAD_TOO_LARGE); STATIC_RESP!(UriTooLong, StatusCode::URI_TOO_LONG); STATIC_RESP!(UnsupportedMediaType, StatusCode::UNSUPPORTED_MEDIA_TYPE); diff --git a/src/httpmessage.rs b/src/httpmessage.rs index c50de2e94..39aa1b689 100644 --- a/src/httpmessage.rs +++ b/src/httpmessage.rs @@ -25,7 +25,7 @@ pub trait HttpMessage: Sized { fn headers(&self) -> &HeaderMap; /// Message payload stream - fn payload(self) -> Self::Stream; + fn payload(&mut self) -> Option; #[doc(hidden)] /// Get a header @@ -128,7 +128,7 @@ pub trait HttpMessage: Sized { /// } /// # fn main() {} /// ``` - fn body(self) -> MessageBody { + fn body(&mut self) -> MessageBody { MessageBody::new(self) } @@ -162,7 +162,7 @@ pub trait HttpMessage: Sized { /// } /// # fn main() {} /// ``` - fn urlencoded(self) -> UrlEncoded { + fn urlencoded(&mut self) -> UrlEncoded { UrlEncoded::new(self) } @@ -198,19 +198,19 @@ pub trait HttpMessage: Sized { /// } /// # fn main() {} /// ``` - fn json(self) -> JsonBody { + fn json(&mut self) -> JsonBody { JsonBody::new(self) } /// Return stream of lines. - fn readlines(self) -> Readlines { + fn readlines(&mut self) -> Readlines { Readlines::new(self) } } /// Stream to read request line by line. pub struct Readlines { - stream: T::Stream, + stream: Option, buff: BytesMut, limit: usize, checked_buff: bool, @@ -220,7 +220,7 @@ pub struct Readlines { impl Readlines { /// Create a new stream to read request line by line. - fn new(req: T) -> Self { + fn new(req: &mut T) -> Self { let encoding = match req.encoding() { Ok(enc) => enc, Err(err) => return Self::err(req, err.into()), @@ -242,7 +242,7 @@ impl Readlines { self } - fn err(req: T, err: ReadlinesError) -> Self { + fn err(req: &mut T, err: ReadlinesError) -> Self { Readlines { stream: req.payload(), buff: BytesMut::new(), @@ -292,61 +292,65 @@ impl Stream for Readlines { self.checked_buff = true; } // poll req for more bytes - match self.stream.poll() { - Ok(Async::Ready(Some(mut bytes))) => { - // check if there is a newline in bytes - let mut found: Option = None; - for (ind, b) in bytes.iter().enumerate() { - if *b == b'\n' { - found = Some(ind); - break; + if let Some(ref mut stream) = self.stream { + match stream.poll() { + Ok(Async::Ready(Some(mut bytes))) => { + // check if there is a newline in bytes + let mut found: Option = None; + for (ind, b) in bytes.iter().enumerate() { + if *b == b'\n' { + found = Some(ind); + break; + } } + if let Some(ind) = found { + // check if line is longer than limit + if ind + 1 > self.limit { + return Err(ReadlinesError::LimitOverflow); + } + let enc: *const Encoding = self.encoding as *const Encoding; + let line = if enc == UTF_8 { + str::from_utf8(&bytes.split_to(ind + 1)) + .map_err(|_| ReadlinesError::EncodingError)? + .to_owned() + } else { + self.encoding + .decode(&bytes.split_to(ind + 1), DecoderTrap::Strict) + .map_err(|_| ReadlinesError::EncodingError)? + }; + // extend buffer with rest of the bytes; + self.buff.extend_from_slice(&bytes); + self.checked_buff = false; + return Ok(Async::Ready(Some(line))); + } + self.buff.extend_from_slice(&bytes); + Ok(Async::NotReady) } - if let Some(ind) = found { - // check if line is longer than limit - if ind + 1 > self.limit { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(None)) => { + if self.buff.is_empty() { + return Ok(Async::Ready(None)); + } + if self.buff.len() > self.limit { return Err(ReadlinesError::LimitOverflow); } let enc: *const Encoding = self.encoding as *const Encoding; let line = if enc == UTF_8 { - str::from_utf8(&bytes.split_to(ind + 1)) + str::from_utf8(&self.buff) .map_err(|_| ReadlinesError::EncodingError)? .to_owned() } else { self.encoding - .decode(&bytes.split_to(ind + 1), DecoderTrap::Strict) + .decode(&self.buff, DecoderTrap::Strict) .map_err(|_| ReadlinesError::EncodingError)? }; - // extend buffer with rest of the bytes; - self.buff.extend_from_slice(&bytes); - self.checked_buff = false; - return Ok(Async::Ready(Some(line))); + self.buff.clear(); + Ok(Async::Ready(Some(line))) } - self.buff.extend_from_slice(&bytes); - Ok(Async::NotReady) + Err(e) => Err(ReadlinesError::from(e)), } - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(None)) => { - if self.buff.is_empty() { - return Ok(Async::Ready(None)); - } - if self.buff.len() > self.limit { - return Err(ReadlinesError::LimitOverflow); - } - let enc: *const Encoding = self.encoding as *const Encoding; - let line = if enc == UTF_8 { - str::from_utf8(&self.buff) - .map_err(|_| ReadlinesError::EncodingError)? - .to_owned() - } else { - self.encoding - .decode(&self.buff, DecoderTrap::Strict) - .map_err(|_| ReadlinesError::EncodingError)? - }; - self.buff.clear(); - Ok(Async::Ready(Some(line))) - } - Err(e) => Err(ReadlinesError::from(e)), + } else { + Ok(Async::Ready(None)) } } } @@ -362,7 +366,7 @@ pub struct MessageBody { impl MessageBody { /// Create `MessageBody` for request. - pub fn new(req: T) -> MessageBody { + pub fn new(req: &mut T) -> MessageBody { let mut len = None; if let Some(l) = req.headers().get(header::CONTENT_LENGTH) { if let Ok(s) = l.to_str() { @@ -379,7 +383,7 @@ impl MessageBody { MessageBody { limit: 262_144, length: len, - stream: Some(req.payload()), + stream: req.payload(), fut: None, err: None, } @@ -424,6 +428,10 @@ where } } + if self.stream.is_none() { + return Ok(Async::Ready(Bytes::new())); + } + // future let limit = self.limit; self.fut = Some(Box::new( @@ -457,7 +465,7 @@ pub struct UrlEncoded { impl UrlEncoded { /// Create a new future to URL encode a request - pub fn new(req: T) -> UrlEncoded { + pub fn new(req: &mut T) -> UrlEncoded { // check content type if req.content_type().to_lowercase() != "application/x-www-form-urlencoded" { return Self::err(UrlencodedError::ContentType); @@ -482,7 +490,7 @@ impl UrlEncoded { UrlEncoded { encoding, - stream: Some(req.payload()), + stream: req.payload(), limit: 262_144, length: len, fut: None, @@ -694,7 +702,7 @@ mod tests { #[test] fn test_urlencoded_error() { - let req = TestRequest::with_header( + let mut req = TestRequest::with_header( header::CONTENT_TYPE, "application/x-www-form-urlencoded", ) @@ -705,7 +713,7 @@ mod tests { UrlencodedError::UnknownLength ); - let req = TestRequest::with_header( + let mut req = TestRequest::with_header( header::CONTENT_TYPE, "application/x-www-form-urlencoded", ) @@ -716,7 +724,7 @@ mod tests { UrlencodedError::Overflow ); - let req = TestRequest::with_header(header::CONTENT_TYPE, "text/plain") + let mut req = TestRequest::with_header(header::CONTENT_TYPE, "text/plain") .header(header::CONTENT_LENGTH, "10") .finish(); assert_eq!( @@ -727,7 +735,7 @@ mod tests { #[test] fn test_urlencoded() { - let req = TestRequest::with_header( + let mut req = TestRequest::with_header( header::CONTENT_TYPE, "application/x-www-form-urlencoded", ) @@ -743,7 +751,7 @@ mod tests { }) ); - let req = TestRequest::with_header( + let mut req = TestRequest::with_header( header::CONTENT_TYPE, "application/x-www-form-urlencoded; charset=utf-8", ) @@ -762,19 +770,20 @@ mod tests { #[test] fn test_message_body() { - let req = TestRequest::with_header(header::CONTENT_LENGTH, "xxxx").finish(); + let mut req = TestRequest::with_header(header::CONTENT_LENGTH, "xxxx").finish(); match req.body().poll().err().unwrap() { PayloadError::UnknownLength => (), _ => unreachable!("error"), } - let req = TestRequest::with_header(header::CONTENT_LENGTH, "1000000").finish(); + let mut req = + TestRequest::with_header(header::CONTENT_LENGTH, "1000000").finish(); match req.body().poll().err().unwrap() { PayloadError::Overflow => (), _ => unreachable!("error"), } - let req = TestRequest::default() + let mut req = TestRequest::default() .set_payload(Bytes::from_static(b"test")) .finish(); match req.body().poll().ok().unwrap() { @@ -782,7 +791,7 @@ mod tests { _ => unreachable!("error"), } - let req = TestRequest::default() + let mut req = TestRequest::default() .set_payload(Bytes::from_static(b"11111111111111")) .finish(); match req.body().limit(5).poll().err().unwrap() { @@ -793,14 +802,14 @@ mod tests { #[test] fn test_readlines() { - let req = TestRequest::default() + let mut req = TestRequest::default() .set_payload(Bytes::from_static( b"Lorem Ipsum is simply dummy text of the printing and typesetting\n\ industry. Lorem Ipsum has been the industry's standard dummy\n\ Contrary to popular belief, Lorem Ipsum is not simply random text.", )) .finish(); - let mut r = Readlines::new(req); + let mut r = Readlines::new(&mut req); match r.poll().ok().unwrap() { Async::Ready(Some(s)) => assert_eq!( s, diff --git a/src/json.rs b/src/json.rs index fc1ab4d25..6cd1e87ab 100644 --- a/src/json.rs +++ b/src/json.rs @@ -50,7 +50,7 @@ pub struct JsonBody { impl JsonBody { /// Create `JsonBody` for request. - pub fn new(req: T) -> Self { + pub fn new(req: &mut T) -> Self { // check content-type let json = if let Ok(Some(mime)) = req.mime_type() { mime.subtype() == mime::JSON || mime.suffix() == Some(mime::JSON) @@ -79,7 +79,7 @@ impl JsonBody { JsonBody { limit: 262_144, length: len, - stream: Some(req.payload()), + stream: req.payload(), fut: None, err: None, } @@ -164,11 +164,11 @@ mod tests { #[test] fn test_json_body() { - let req = TestRequest::default().finish(); + let mut req = TestRequest::default().finish(); let mut json = req.json::(); assert_eq!(json.poll().err().unwrap(), JsonPayloadError::ContentType); - let req = TestRequest::default() + let mut req = TestRequest::default() .header( header::CONTENT_TYPE, header::HeaderValue::from_static("application/text"), @@ -177,7 +177,7 @@ mod tests { let mut json = req.json::(); assert_eq!(json.poll().err().unwrap(), JsonPayloadError::ContentType); - let req = TestRequest::default() + let mut req = TestRequest::default() .header( header::CONTENT_TYPE, header::HeaderValue::from_static("application/json"), @@ -190,7 +190,7 @@ mod tests { let mut json = req.json::().limit(100); assert_eq!(json.poll().err().unwrap(), JsonPayloadError::Overflow); - let req = TestRequest::default() + let mut req = TestRequest::default() .header( header::CONTENT_TYPE, header::HeaderValue::from_static("application/json"), diff --git a/src/lib.rs b/src/lib.rs index cdb4f0382..a3ca52eda 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,7 +78,6 @@ mod httpcodes; mod httpmessage; mod json; mod message; -mod payload; mod request; mod response; mod service; @@ -111,7 +110,6 @@ pub mod dev { pub use crate::httpmessage::{MessageBody, Readlines, UrlEncoded}; pub use crate::json::JsonBody; - pub use crate::payload::{Payload, PayloadBuffer}; pub use crate::response::ResponseBuilder; } diff --git a/src/request.rs b/src/request.rs index b60a772e1..4df95d450 100644 --- a/src/request.rs +++ b/src/request.rs @@ -10,7 +10,8 @@ use crate::error::PayloadError; use crate::extensions::Extensions; use crate::httpmessage::HttpMessage; use crate::message::{Message, MessagePool, RequestHead}; -use crate::payload::Payload; + +use crate::h1::Payload; /// Request pub struct Request

{ @@ -29,8 +30,8 @@ where } #[inline] - fn payload(mut self) -> P { - self.payload.take().unwrap() + fn payload(&mut self) -> Option

{ + self.payload.take() } } @@ -62,9 +63,9 @@ impl Request { } /// Take request's payload - pub fn take_payload(mut self) -> (Payload, Request<()>) { + pub fn take_payload(mut self) -> (Option, Request<()>) { ( - self.payload.take().unwrap(), + self.payload.take(), Request { payload: Some(()), inner: self.inner.clone(), diff --git a/src/service/mod.rs b/src/service/mod.rs index 83a40bd12..3939ab99c 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -1,3 +1,5 @@ +use h2::RecvStream; + mod senderror; pub use self::senderror::{SendError, SendResponse}; diff --git a/src/test.rs b/src/test.rs index 852dd3c0b..c68bbf8e5 100644 --- a/src/test.rs +++ b/src/test.rs @@ -6,8 +6,8 @@ use cookie::Cookie; use http::header::HeaderName; use http::{HeaderMap, HttpTryFrom, Method, Uri, Version}; +use crate::h1::Payload; use crate::header::{Header, IntoHeaderValue}; -use crate::payload::Payload; use crate::Request; /// Test `Request` builder diff --git a/tests/test_client.rs b/tests/test_client.rs index 606bac22a..6f502b0af 100644 --- a/tests/test_client.rs +++ b/tests/test_client.rs @@ -47,7 +47,7 @@ fn test_h1_v2() { assert!(repr.contains("ClientRequest")); assert!(repr.contains("x-test")); - let response = srv.block_on(request.send(&mut connector)).unwrap(); + let mut response = srv.block_on(request.send(&mut connector)).unwrap(); assert!(response.status().is_success()); // read response @@ -55,7 +55,7 @@ fn test_h1_v2() { assert_eq!(bytes, Bytes::from_static(STR.as_ref())); let request = srv.post().finish().unwrap(); - let response = srv.block_on(request.send(&mut connector)).unwrap(); + let mut response = srv.block_on(request.send(&mut connector)).unwrap(); assert!(response.status().is_success()); // read response diff --git a/tests/test_server.rs b/tests/test_server.rs index 9fa27e71b..53db38403 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -89,7 +89,7 @@ fn test_h2_body() -> std::io::Result<()> { .map_err(|e| println!("Openssl error: {}", e)) .and_then( h2::H2Service::build() - .finish(|req: Request<_>| { + .finish(|mut req: Request<_>| { req.body() .limit(1024 * 1024) .and_then(|body| Ok(Response::Ok().body(body))) @@ -101,7 +101,7 @@ fn test_h2_body() -> std::io::Result<()> { let req = client::ClientRequest::get(srv.surl("/")) .body(data.clone()) .unwrap(); - let response = srv.send_request(req).unwrap(); + let mut response = srv.send_request(req).unwrap(); assert!(response.status().is_success()); let body = srv.block_on(response.body().limit(1024 * 1024)).unwrap(); @@ -350,7 +350,7 @@ fn test_headers() { let req = srv.get().finish().unwrap(); - let response = srv.block_on(req.send(&mut connector)).unwrap(); + let mut response = srv.block_on(req.send(&mut connector)).unwrap(); assert!(response.status().is_success()); // read response @@ -387,7 +387,7 @@ fn test_body() { }); let req = srv.get().finish().unwrap(); - let response = srv.send_request(req).unwrap(); + let mut response = srv.send_request(req).unwrap(); assert!(response.status().is_success()); // read response @@ -402,7 +402,7 @@ fn test_head_empty() { }); let req = client::ClientRequest::head(srv.url("/")).finish().unwrap(); - let response = srv.send_request(req).unwrap(); + let mut response = srv.send_request(req).unwrap(); assert!(response.status().is_success()); { @@ -428,7 +428,7 @@ fn test_head_binary() { }); let req = client::ClientRequest::head(srv.url("/")).finish().unwrap(); - let response = srv.send_request(req).unwrap(); + let mut response = srv.send_request(req).unwrap(); assert!(response.status().is_success()); { @@ -477,7 +477,7 @@ fn test_body_length() { }); let req = srv.get().finish().unwrap(); - let response = srv.send_request(req).unwrap(); + let mut response = srv.send_request(req).unwrap(); assert!(response.status().is_success()); // read response @@ -496,7 +496,7 @@ fn test_body_chunked_explicit() { }); let req = srv.get().finish().unwrap(); - let response = srv.send_request(req).unwrap(); + let mut response = srv.send_request(req).unwrap(); assert!(response.status().is_success()); // read response @@ -517,7 +517,7 @@ fn test_body_chunked_implicit() { }); let req = srv.get().finish().unwrap(); - let response = srv.send_request(req).unwrap(); + let mut response = srv.send_request(req).unwrap(); assert!(response.status().is_success()); // read response @@ -540,7 +540,7 @@ fn test_response_http_error_handling() { }); let req = srv.get().finish().unwrap(); - let response = srv.send_request(req).unwrap(); + let mut response = srv.send_request(req).unwrap(); assert_eq!(response.status(), http::StatusCode::INTERNAL_SERVER_ERROR); // read response diff --git a/tests/test_ws.rs b/tests/test_ws.rs index 07f857d7a..bf5a3c41e 100644 --- a/tests/test_ws.rs +++ b/tests/test_ws.rs @@ -5,7 +5,6 @@ use actix_http_test::TestServer; use actix_service::NewService; use actix_utils::framed::IntoFramed; use actix_utils::stream::TakeItem; -use actix_web::ws as web_ws; use bytes::{Bytes, BytesMut}; use futures::future::{lazy, ok, Either}; use futures::{Future, Sink, Stream}; @@ -105,37 +104,4 @@ fn test_simple() { item, Some(ws::Frame::Close(Some(ws::CloseCode::Normal.into()))) ); - - { - let mut sys = actix_web::actix::System::new("test"); - let url = srv.url("/"); - - let (reader, mut writer) = sys - .block_on(lazy(|| web_ws::Client::new(url).connect())) - .unwrap(); - - writer.text("text"); - let (item, reader) = sys.block_on(reader.into_future()).unwrap(); - assert_eq!(item, Some(web_ws::Message::Text("text".to_owned()))); - - writer.binary(b"text".as_ref()); - let (item, reader) = sys.block_on(reader.into_future()).unwrap(); - assert_eq!( - item, - Some(web_ws::Message::Binary(Bytes::from_static(b"text").into())) - ); - - writer.ping("ping"); - let (item, reader) = sys.block_on(reader.into_future()).unwrap(); - assert_eq!(item, Some(web_ws::Message::Pong("ping".to_owned()))); - - writer.close(Some(web_ws::CloseCode::Normal.into())); - let (item, _) = sys.block_on(reader.into_future()).unwrap(); - assert_eq!( - item, - Some(web_ws::Message::Close(Some( - web_ws::CloseCode::Normal.into() - ))) - ); - } }