1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 16:02:59 +01:00

use buffer capacity; remove unused imports

This commit is contained in:
Nikolay Kim 2018-02-26 15:26:27 -08:00
parent 72aa2d9eae
commit d6fd4a3524
10 changed files with 87 additions and 103 deletions

View File

@ -259,20 +259,7 @@ fn index(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> {
## Streaming request ## Streaming request
*HttpRequest* is a stream of `Bytes` objects. It could be used to read request *HttpRequest* is a stream of `Bytes` objects. It could be used to read request
body payload. At the same time actix uses body payload.
[*Payload*](../actix_web/payload/struct.Payload.html) object.
*HttpRequest* provides several methods, which can be used for
payload access.At the same time *Payload* implements *Stream* trait, so it
could be used with various stream combinators. Also *Payload* provides
several convenience methods that return future object that resolve to Bytes object.
* *readexactly()* method returns *Future* that resolves when specified number of bytes
get received.
* *readline()* method returns *Future* that resolves when `\n` get received.
* *readuntil()* method returns *Future* that resolves when specified bytes string
matches in input bytes stream
In this example handle reads request payload chunk by chunk and prints every chunk. In this example handle reads request payload chunk by chunk and prints every chunk.

View File

@ -1,19 +1,14 @@
#![allow(unused_imports, dead_code)]
use std::{io, time}; use std::{io, time};
use std::net::{SocketAddr, Shutdown}; use std::net::Shutdown;
use std::collections::VecDeque;
use std::time::Duration;
use actix::{fut, Actor, ActorFuture, Arbiter, Context, use actix::{fut, Actor, ActorFuture, Context,
Handler, Message, ActorResponse, Supervised}; Handler, Message, ActorResponse, Supervised};
use actix::registry::ArbiterService; use actix::registry::ArbiterService;
use actix::fut::WrapFuture; use actix::fut::WrapFuture;
use actix::actors::{Connector, ConnectorError, Connect as ResolveConnect}; use actix::actors::{Connector, ConnectorError, Connect as ResolveConnect};
use http::{Uri, HttpTryFrom, Error as HttpError}; use http::{Uri, HttpTryFrom, Error as HttpError};
use futures::{Async, Future, Poll}; use futures::Poll;
use tokio_core::reactor::Timeout;
use tokio_core::net::{TcpStream, TcpStreamNew};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
#[cfg(feature="alpn")] #[cfg(feature="alpn")]

View File

@ -26,6 +26,7 @@ pub struct ClientRequest {
upgrade: bool, upgrade: bool,
encoding: ContentEncoding, encoding: ContentEncoding,
response_decompress: bool, response_decompress: bool,
buffer_capacity: Option<(usize, usize)>,
} }
impl Default for ClientRequest { impl Default for ClientRequest {
@ -41,6 +42,7 @@ impl Default for ClientRequest {
upgrade: false, upgrade: false,
encoding: ContentEncoding::Auto, encoding: ContentEncoding::Auto,
response_decompress: true, response_decompress: true,
buffer_capacity: None,
} }
} }
} }
@ -167,6 +169,10 @@ impl ClientRequest {
self.response_decompress self.response_decompress
} }
pub fn buffer_capacity(&self) -> Option<(usize, usize)> {
self.buffer_capacity
}
/// Get body os this response /// Get body os this response
#[inline] #[inline]
pub fn body(&self) -> &Body { pub fn body(&self) -> &Body {
@ -434,6 +440,16 @@ impl ClientRequestBuilder {
self self
} }
/// Set write buffer capacity
pub fn buffer_capacity(&mut self,
low_watermark: usize,
high_watermark: usize) -> &mut Self
{
if let Some(parts) = parts(&mut self.request, &self.err) {
parts.buffer_capacity = Some((low_watermark, high_watermark));
}
self
}
/// This method calls provided closure with builder reference if value is true. /// This method calls provided closure with builder reference if value is true.
pub fn if_true<F>(&mut self, value: bool, f: F) -> &mut Self pub fn if_true<F>(&mut self, value: bool, f: F) -> &mut Self

View File

@ -1,5 +1,4 @@
#![cfg_attr(feature = "cargo-clippy", allow(redundant_field_names))] #![cfg_attr(feature = "cargo-clippy", allow(redundant_field_names))]
#![allow(dead_code)]
use std::io::{self, Write}; use std::io::{self, Write};
use std::cell::RefCell; use std::cell::RefCell;
@ -67,9 +66,9 @@ impl HttpClientWriter {
self.buffer.take(); self.buffer.take();
} }
pub fn keepalive(&self) -> bool { // pub fn keepalive(&self) -> bool {
self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE) // self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE)
} // }
/// Set write buffer capacity /// Set write buffer capacity
pub fn set_buffer_capacity(&mut self, low_watermark: usize, high_watermark: usize) { pub fn set_buffer_capacity(&mut self, low_watermark: usize, high_watermark: usize) {
@ -107,6 +106,9 @@ impl HttpClientWriter {
// prepare task // prepare task
self.flags.insert(Flags::STARTED); self.flags.insert(Flags::STARTED);
self.encoder = content_encoder(self.buffer.clone(), msg); self.encoder = content_encoder(self.buffer.clone(), msg);
if let Some(capacity) = msg.buffer_capacity() {
self.set_buffer_capacity(capacity.0, capacity.1);
}
// render message // render message
{ {

View File

@ -436,26 +436,6 @@ impl<S> HttpRequest<S> {
} }
} }
/// Returns reference to the associated http payload.
#[inline]
pub fn payload(&self) -> &Payload {
let msg = self.as_mut();
if msg.payload.is_none() {
msg.payload = Some(Payload::empty());
}
msg.payload.as_ref().unwrap()
}
/// Returns mutable reference to the associated http payload.
#[inline]
pub fn payload_mut(&mut self) -> &mut Payload {
let msg = self.as_mut();
if msg.payload.is_none() {
msg.payload = Some(Payload::empty());
}
msg.payload.as_mut().unwrap()
}
/// Load request body. /// Load request body.
/// ///
/// By default only 256Kb payload reads to a memory, then `BAD REQUEST` /// By default only 256Kb payload reads to a memory, then `BAD REQUEST`
@ -589,6 +569,24 @@ impl<S> HttpRequest<S> {
pub fn json<T: DeserializeOwned>(self) -> JsonBody<S, T> { pub fn json<T: DeserializeOwned>(self) -> JsonBody<S, T> {
JsonBody::from_request(self) JsonBody::from_request(self)
} }
#[cfg(test)]
pub(crate) fn payload(&self) -> &Payload {
let msg = self.as_mut();
if msg.payload.is_none() {
msg.payload = Some(Payload::empty());
}
msg.payload.as_ref().unwrap()
}
#[cfg(test)]
pub(crate) fn payload_mut(&mut self) -> &mut Payload {
let msg = self.as_mut();
if msg.payload.is_none() {
msg.payload = Some(Payload::empty());
}
msg.payload.as_mut().unwrap()
}
} }
impl Default for HttpRequest<()> { impl Default for HttpRequest<()> {
@ -610,36 +608,45 @@ impl<S> Stream for HttpRequest<S> {
type Error = PayloadError; type Error = PayloadError;
fn poll(&mut self) -> Poll<Option<Bytes>, PayloadError> { fn poll(&mut self) -> Poll<Option<Bytes>, PayloadError> {
self.payload_mut().poll() let msg = self.as_mut();
if msg.payload.is_none() {
Ok(Async::Ready(None))
} else {
msg.payload.as_mut().unwrap().poll()
}
} }
} }
impl<S> io::Read for HttpRequest<S> { impl<S> io::Read for HttpRequest<S> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self.payload_mut().poll() { if self.as_mut().payload.is_some() {
Ok(Async::Ready(Some(mut b))) => { match self.as_mut().payload.as_mut().unwrap().poll() {
let i = cmp::min(b.len(), buf.len()); Ok(Async::Ready(Some(mut b))) => {
buf.copy_from_slice(&b.split_to(i)[..i]); let i = cmp::min(b.len(), buf.len());
buf.copy_from_slice(&b.split_to(i)[..i]);
if !b.is_empty() { if !b.is_empty() {
self.payload_mut().unread_data(b); self.as_mut().payload.as_mut().unwrap().unread_data(b);
} }
if i < buf.len() { if i < buf.len() {
match self.read(&mut buf[i..]) { match self.read(&mut buf[i..]) {
Ok(n) => Ok(i + n), Ok(n) => Ok(i + n),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(i), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(i),
Err(e) => Err(e), Err(e) => Err(e),
}
} else {
Ok(i)
} }
} else {
Ok(i)
} }
Ok(Async::Ready(None)) => Ok(0),
Ok(Async::NotReady) =>
Err(io::Error::new(io::ErrorKind::WouldBlock, "Not ready")),
Err(e) =>
Err(io::Error::new(io::ErrorKind::Other, failure::Error::from(e).compat())),
} }
Ok(Async::Ready(None)) => Ok(0), } else {
Ok(Async::NotReady) => Ok(0)
Err(io::Error::new(io::ErrorKind::WouldBlock, "Not ready")),
Err(e) =>
Err(io::Error::new(io::ErrorKind::Other, failure::Error::from(e).compat())),
} }
} }
} }

View File

@ -100,6 +100,7 @@ extern crate tokio_openssl;
mod application; mod application;
mod body; mod body;
mod context; mod context;
mod handler;
mod helpers; mod helpers;
mod httprequest; mod httprequest;
mod httpresponse; mod httpresponse;
@ -107,9 +108,9 @@ mod info;
mod json; mod json;
mod route; mod route;
mod router; mod router;
mod param;
mod resource; mod resource;
mod handler; mod param;
mod payload;
mod pipeline; mod pipeline;
pub mod client; pub mod client;
@ -121,7 +122,6 @@ pub mod multipart;
pub mod middleware; pub mod middleware;
pub mod pred; pub mod pred;
pub mod test; pub mod test;
pub mod payload;
pub mod server; pub mod server;
pub use error::{Error, Result, ResponseError}; pub use error::{Error, Result, ResponseError};
pub use body::{Body, Binary}; pub use body::{Body, Binary};

View File

@ -1,6 +1,3 @@
#![allow(dead_code, unused_imports, unused_variables)]
use std::any::Any;
use std::rc::Rc; use std::rc::Rc;
use std::sync::Arc; use std::sync::Arc;
use std::marker::PhantomData; use std::marker::PhantomData;
@ -49,8 +46,7 @@ impl<S> RequestSession for HttpRequest<S> {
return Session(s.0.as_mut()) return Session(s.0.as_mut())
} }
} }
//Session(&mut DUMMY) Session(unsafe{&mut DUMMY})
unreachable!()
} }
} }
@ -195,15 +191,13 @@ pub trait SessionBackend<S>: Sized + 'static {
/// Dummy session impl, does not do anything /// Dummy session impl, does not do anything
struct DummySessionImpl; struct DummySessionImpl;
static DUMMY: DummySessionImpl = DummySessionImpl; static mut DUMMY: DummySessionImpl = DummySessionImpl;
impl SessionImpl for DummySessionImpl { impl SessionImpl for DummySessionImpl {
fn get(&self, key: &str) -> Option<&str> { fn get(&self, _: &str) -> Option<&str> { None }
None fn set(&mut self, _: &str, _: String) {}
} fn remove(&mut self, _: &str) {}
fn set(&mut self, key: &str, value: String) {}
fn remove(&mut self, key: &str) {}
fn clear(&mut self) {} fn clear(&mut self) {}
fn write(&self, resp: HttpResponse) -> Result<Response> { fn write(&self, resp: HttpResponse) -> Result<Response> {
Ok(Response::Done(resp)) Ok(Response::Done(resp))

View File

@ -441,14 +441,6 @@ impl<S> PayloadHelper<S> where S: Stream<Item=Bytes, Error=PayloadError> {
}) })
} }
pub fn len(&self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn readany(&mut self) -> Poll<Option<Bytes>, PayloadError> { pub fn readany(&mut self) -> Poll<Option<Bytes>, PayloadError> {
if let Some(data) = self.items.pop_front() { if let Some(data) = self.items.pop_front() {
self.len -= data.len(); self.len -= data.len();
@ -569,6 +561,7 @@ impl<S> PayloadHelper<S> where S: Stream<Item=Bytes, Error=PayloadError> {
self.items.push_front(data); self.items.push_front(data);
} }
#[allow(dead_code)]
pub fn remaining(&mut self) -> Bytes { pub fn remaining(&mut self) -> Bytes {
self.items.iter_mut() self.items.iter_mut()
.fold(BytesMut::new(), |mut b, c| { .fold(BytesMut::new(), |mut b, c| {

View File

@ -1,21 +1,17 @@
//! Http client request //! Http client request
#![allow(unused_imports, dead_code)]
use std::{fmt, io, str}; use std::{fmt, io, str};
use std::rc::Rc; use std::rc::Rc;
use std::time::Duration;
use std::cell::UnsafeCell; use std::cell::UnsafeCell;
use base64; use base64;
use rand; use rand;
use bytes::Bytes;
use cookie::Cookie; use cookie::Cookie;
use bytes::{Bytes, BytesMut};
use http::{HttpTryFrom, StatusCode, Error as HttpError}; use http::{HttpTryFrom, StatusCode, Error as HttpError};
use http::header::{self, HeaderName, HeaderValue}; use http::header::{self, HeaderName, HeaderValue};
use sha1::Sha1; use sha1::Sha1;
use futures::{Async, Future, Poll, Stream}; use futures::{Async, Future, Poll, Stream};
use futures::future::{Either, err as FutErr};
use futures::unsync::mpsc::{unbounded, UnboundedSender}; use futures::unsync::mpsc::{unbounded, UnboundedSender};
use tokio_core::net::TcpStream;
use byteorder::{ByteOrder, NetworkEndian}; use byteorder::{ByteOrder, NetworkEndian};
use actix::prelude::*; use actix::prelude::*;
@ -23,13 +19,10 @@ use actix::prelude::*;
use body::{Body, Binary}; use body::{Body, Binary};
use error::{WsError, UrlParseError}; use error::{WsError, UrlParseError};
use payload::PayloadHelper; use payload::PayloadHelper;
use server::shared::SharedBytes;
use server::{utils, IoStream};
use client::{ClientRequest, ClientRequestBuilder, ClientResponse, use client::{ClientRequest, ClientRequestBuilder, ClientResponse,
HttpResponseParser, HttpResponseParserError, HttpClientWriter}; ClientConnector, SendRequest, SendRequestError,
use client::{Connect, Connection, ClientConnector, ClientConnectorError, HttpResponseParserError};
SendRequest, SendRequestError};
use super::Message; use super::Message;
use super::frame::Frame; use super::frame::Frame;
@ -224,7 +217,6 @@ struct WsInner {
} }
pub struct WsHandshake { pub struct WsHandshake {
inner: Option<WsInner>,
request: Option<SendRequest>, request: Option<SendRequest>,
tx: Option<UnboundedSender<Bytes>>, tx: Option<UnboundedSender<Bytes>>,
key: String, key: String,
@ -254,7 +246,6 @@ impl WsHandshake {
WsHandshake { WsHandshake {
key, key,
inner: None,
request: Some(request.with_connector(conn.clone())), request: Some(request.with_connector(conn.clone())),
tx: Some(tx), tx: Some(tx),
error: err, error: err,
@ -262,7 +253,6 @@ impl WsHandshake {
} else { } else {
WsHandshake { WsHandshake {
key, key,
inner: None,
request: None, request: None,
tx: None, tx: None,
error: err, error: err,

View File

@ -91,7 +91,7 @@ pub fn start<A, S>(req: HttpRequest<S>, actor: A) -> Result<HttpResponse, Error>
S: 'static S: 'static
{ {
let mut resp = handshake(&req)?; let mut resp = handshake(&req)?;
let stream = WsStream::new(req.payload().clone()); let stream = WsStream::new(req.clone());
let mut ctx = WebsocketContext::new(req, actor); let mut ctx = WebsocketContext::new(req, actor);
ctx.add_message_stream(stream); ctx.add_message_stream(stream);