diff --git a/guide/src/qs_7.md b/guide/src/qs_7.md index 2b063f57c..66ffcb633 100644 --- a/guide/src/qs_7.md +++ b/guide/src/qs_7.md @@ -259,20 +259,7 @@ fn index(mut req: HttpRequest) -> Box> { ## Streaming request *HttpRequest* is a stream of `Bytes` objects. It could be used to read request -body payload. At the same time actix uses -[*Payload*](../actix_web/payload/struct.Payload.html) object. -*HttpRequest* provides several methods, which can be used for -payload access.At the same time *Payload* implements *Stream* trait, so it -could be used with various stream combinators. Also *Payload* provides -several convenience methods that return future object that resolve to Bytes object. - -* *readexactly()* method returns *Future* that resolves when specified number of bytes - get received. - -* *readline()* method returns *Future* that resolves when `\n` get received. - -* *readuntil()* method returns *Future* that resolves when specified bytes string - matches in input bytes stream +body payload. In this example handle reads request payload chunk by chunk and prints every chunk. diff --git a/src/client/connector.rs b/src/client/connector.rs index 7acd4ed28..5f27b8265 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -1,19 +1,14 @@ -#![allow(unused_imports, dead_code)] use std::{io, time}; -use std::net::{SocketAddr, Shutdown}; -use std::collections::VecDeque; -use std::time::Duration; +use std::net::Shutdown; -use actix::{fut, Actor, ActorFuture, Arbiter, Context, +use actix::{fut, Actor, ActorFuture, Context, Handler, Message, ActorResponse, Supervised}; use actix::registry::ArbiterService; use actix::fut::WrapFuture; use actix::actors::{Connector, ConnectorError, Connect as ResolveConnect}; use http::{Uri, HttpTryFrom, Error as HttpError}; -use futures::{Async, Future, Poll}; -use tokio_core::reactor::Timeout; -use tokio_core::net::{TcpStream, TcpStreamNew}; +use futures::Poll; use tokio_io::{AsyncRead, AsyncWrite}; #[cfg(feature="alpn")] diff --git a/src/client/request.rs b/src/client/request.rs index fd1d40c5f..392ef6b9d 100644 --- a/src/client/request.rs +++ b/src/client/request.rs @@ -26,6 +26,7 @@ pub struct ClientRequest { upgrade: bool, encoding: ContentEncoding, response_decompress: bool, + buffer_capacity: Option<(usize, usize)>, } impl Default for ClientRequest { @@ -41,6 +42,7 @@ impl Default for ClientRequest { upgrade: false, encoding: ContentEncoding::Auto, response_decompress: true, + buffer_capacity: None, } } } @@ -167,6 +169,10 @@ impl ClientRequest { self.response_decompress } + pub fn buffer_capacity(&self) -> Option<(usize, usize)> { + self.buffer_capacity + } + /// Get body os this response #[inline] pub fn body(&self) -> &Body { @@ -434,6 +440,16 @@ impl ClientRequestBuilder { self } + /// Set write buffer capacity + pub fn buffer_capacity(&mut self, + low_watermark: usize, + high_watermark: usize) -> &mut Self + { + if let Some(parts) = parts(&mut self.request, &self.err) { + parts.buffer_capacity = Some((low_watermark, high_watermark)); + } + self + } /// This method calls provided closure with builder reference if value is true. pub fn if_true(&mut self, value: bool, f: F) -> &mut Self diff --git a/src/client/writer.rs b/src/client/writer.rs index f072ea7f4..f67bd7261 100644 --- a/src/client/writer.rs +++ b/src/client/writer.rs @@ -1,5 +1,4 @@ #![cfg_attr(feature = "cargo-clippy", allow(redundant_field_names))] -#![allow(dead_code)] use std::io::{self, Write}; use std::cell::RefCell; @@ -67,9 +66,9 @@ impl HttpClientWriter { self.buffer.take(); } - pub fn keepalive(&self) -> bool { - self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE) - } + // pub fn keepalive(&self) -> bool { + // self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE) + // } /// Set write buffer capacity pub fn set_buffer_capacity(&mut self, low_watermark: usize, high_watermark: usize) { @@ -107,6 +106,9 @@ impl HttpClientWriter { // prepare task self.flags.insert(Flags::STARTED); self.encoder = content_encoder(self.buffer.clone(), msg); + if let Some(capacity) = msg.buffer_capacity() { + self.set_buffer_capacity(capacity.0, capacity.1); + } // render message { diff --git a/src/httprequest.rs b/src/httprequest.rs index 3e6000b54..aa8df4f59 100644 --- a/src/httprequest.rs +++ b/src/httprequest.rs @@ -436,26 +436,6 @@ impl HttpRequest { } } - /// Returns reference to the associated http payload. - #[inline] - pub fn payload(&self) -> &Payload { - let msg = self.as_mut(); - if msg.payload.is_none() { - msg.payload = Some(Payload::empty()); - } - msg.payload.as_ref().unwrap() - } - - /// Returns mutable reference to the associated http payload. - #[inline] - pub fn payload_mut(&mut self) -> &mut Payload { - let msg = self.as_mut(); - if msg.payload.is_none() { - msg.payload = Some(Payload::empty()); - } - msg.payload.as_mut().unwrap() - } - /// Load request body. /// /// By default only 256Kb payload reads to a memory, then `BAD REQUEST` @@ -589,6 +569,24 @@ impl HttpRequest { pub fn json(self) -> JsonBody { JsonBody::from_request(self) } + + #[cfg(test)] + pub(crate) fn payload(&self) -> &Payload { + let msg = self.as_mut(); + if msg.payload.is_none() { + msg.payload = Some(Payload::empty()); + } + msg.payload.as_ref().unwrap() + } + + #[cfg(test)] + pub(crate) fn payload_mut(&mut self) -> &mut Payload { + let msg = self.as_mut(); + if msg.payload.is_none() { + msg.payload = Some(Payload::empty()); + } + msg.payload.as_mut().unwrap() + } } impl Default for HttpRequest<()> { @@ -610,36 +608,45 @@ impl Stream for HttpRequest { type Error = PayloadError; fn poll(&mut self) -> Poll, PayloadError> { - self.payload_mut().poll() + let msg = self.as_mut(); + if msg.payload.is_none() { + Ok(Async::Ready(None)) + } else { + msg.payload.as_mut().unwrap().poll() + } } } impl io::Read for HttpRequest { fn read(&mut self, buf: &mut [u8]) -> io::Result { - match self.payload_mut().poll() { - Ok(Async::Ready(Some(mut b))) => { - let i = cmp::min(b.len(), buf.len()); - buf.copy_from_slice(&b.split_to(i)[..i]); + if self.as_mut().payload.is_some() { + match self.as_mut().payload.as_mut().unwrap().poll() { + Ok(Async::Ready(Some(mut b))) => { + let i = cmp::min(b.len(), buf.len()); + buf.copy_from_slice(&b.split_to(i)[..i]); - if !b.is_empty() { - self.payload_mut().unread_data(b); - } - - if i < buf.len() { - match self.read(&mut buf[i..]) { - Ok(n) => Ok(i + n), - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(i), - Err(e) => Err(e), + if !b.is_empty() { + self.as_mut().payload.as_mut().unwrap().unread_data(b); + } + + if i < buf.len() { + match self.read(&mut buf[i..]) { + Ok(n) => Ok(i + n), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(i), + Err(e) => Err(e), + } + } else { + Ok(i) } - } else { - Ok(i) } + Ok(Async::Ready(None)) => Ok(0), + Ok(Async::NotReady) => + Err(io::Error::new(io::ErrorKind::WouldBlock, "Not ready")), + Err(e) => + Err(io::Error::new(io::ErrorKind::Other, failure::Error::from(e).compat())), } - Ok(Async::Ready(None)) => Ok(0), - Ok(Async::NotReady) => - Err(io::Error::new(io::ErrorKind::WouldBlock, "Not ready")), - Err(e) => - Err(io::Error::new(io::ErrorKind::Other, failure::Error::from(e).compat())), + } else { + Ok(0) } } } diff --git a/src/lib.rs b/src/lib.rs index eb0da3c3c..6221afb90 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -100,6 +100,7 @@ extern crate tokio_openssl; mod application; mod body; mod context; +mod handler; mod helpers; mod httprequest; mod httpresponse; @@ -107,9 +108,9 @@ mod info; mod json; mod route; mod router; -mod param; mod resource; -mod handler; +mod param; +mod payload; mod pipeline; pub mod client; @@ -121,7 +122,6 @@ pub mod multipart; pub mod middleware; pub mod pred; pub mod test; -pub mod payload; pub mod server; pub use error::{Error, Result, ResponseError}; pub use body::{Body, Binary}; diff --git a/src/middleware/session.rs b/src/middleware/session.rs index b46cd49ec..7cdb7e093 100644 --- a/src/middleware/session.rs +++ b/src/middleware/session.rs @@ -1,6 +1,3 @@ -#![allow(dead_code, unused_imports, unused_variables)] - -use std::any::Any; use std::rc::Rc; use std::sync::Arc; use std::marker::PhantomData; @@ -49,8 +46,7 @@ impl RequestSession for HttpRequest { return Session(s.0.as_mut()) } } - //Session(&mut DUMMY) - unreachable!() + Session(unsafe{&mut DUMMY}) } } @@ -195,15 +191,13 @@ pub trait SessionBackend: Sized + 'static { /// Dummy session impl, does not do anything struct DummySessionImpl; -static DUMMY: DummySessionImpl = DummySessionImpl; +static mut DUMMY: DummySessionImpl = DummySessionImpl; impl SessionImpl for DummySessionImpl { - fn get(&self, key: &str) -> Option<&str> { - None - } - fn set(&mut self, key: &str, value: String) {} - fn remove(&mut self, key: &str) {} + fn get(&self, _: &str) -> Option<&str> { None } + fn set(&mut self, _: &str, _: String) {} + fn remove(&mut self, _: &str) {} fn clear(&mut self) {} fn write(&self, resp: HttpResponse) -> Result { Ok(Response::Done(resp)) diff --git a/src/payload.rs b/src/payload.rs index 664004d7b..b193cf646 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -441,14 +441,6 @@ impl PayloadHelper where S: Stream { }) } - pub fn len(&self) -> usize { - self.len - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - pub fn readany(&mut self) -> Poll, PayloadError> { if let Some(data) = self.items.pop_front() { self.len -= data.len(); @@ -569,6 +561,7 @@ impl PayloadHelper where S: Stream { self.items.push_front(data); } + #[allow(dead_code)] pub fn remaining(&mut self) -> Bytes { self.items.iter_mut() .fold(BytesMut::new(), |mut b, c| { diff --git a/src/ws/client.rs b/src/ws/client.rs index 4b2a3b444..19e2543ca 100644 --- a/src/ws/client.rs +++ b/src/ws/client.rs @@ -1,21 +1,17 @@ //! Http client request -#![allow(unused_imports, dead_code)] use std::{fmt, io, str}; use std::rc::Rc; -use std::time::Duration; use std::cell::UnsafeCell; use base64; use rand; +use bytes::Bytes; use cookie::Cookie; -use bytes::{Bytes, BytesMut}; use http::{HttpTryFrom, StatusCode, Error as HttpError}; use http::header::{self, HeaderName, HeaderValue}; use sha1::Sha1; use futures::{Async, Future, Poll, Stream}; -use futures::future::{Either, err as FutErr}; use futures::unsync::mpsc::{unbounded, UnboundedSender}; -use tokio_core::net::TcpStream; use byteorder::{ByteOrder, NetworkEndian}; use actix::prelude::*; @@ -23,13 +19,10 @@ use actix::prelude::*; use body::{Body, Binary}; use error::{WsError, UrlParseError}; use payload::PayloadHelper; -use server::shared::SharedBytes; -use server::{utils, IoStream}; use client::{ClientRequest, ClientRequestBuilder, ClientResponse, - HttpResponseParser, HttpResponseParserError, HttpClientWriter}; -use client::{Connect, Connection, ClientConnector, ClientConnectorError, - SendRequest, SendRequestError}; + ClientConnector, SendRequest, SendRequestError, + HttpResponseParserError}; use super::Message; use super::frame::Frame; @@ -224,7 +217,6 @@ struct WsInner { } pub struct WsHandshake { - inner: Option, request: Option, tx: Option>, key: String, @@ -254,7 +246,6 @@ impl WsHandshake { WsHandshake { key, - inner: None, request: Some(request.with_connector(conn.clone())), tx: Some(tx), error: err, @@ -262,7 +253,6 @@ impl WsHandshake { } else { WsHandshake { key, - inner: None, request: None, tx: None, error: err, diff --git a/src/ws/mod.rs b/src/ws/mod.rs index 465400ac9..ef9eaf32e 100644 --- a/src/ws/mod.rs +++ b/src/ws/mod.rs @@ -91,7 +91,7 @@ pub fn start(req: HttpRequest, actor: A) -> Result S: 'static { let mut resp = handshake(&req)?; - let stream = WsStream::new(req.payload().clone()); + let stream = WsStream::new(req.clone()); let mut ctx = WebsocketContext::new(req, actor); ctx.add_message_stream(stream);