1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-22 14:55:56 +01:00

refactor payload

This commit is contained in:
Nikolay Kim 2017-10-08 20:16:48 -07:00
parent e398694bdb
commit e2dc775e21
12 changed files with 257 additions and 153 deletions

View File

@ -48,8 +48,7 @@ impl Actor for MyRoute {
impl Route for MyRoute {
type State = ();
fn request(req: HttpRequest, payload: Option<Payload>,
ctx: &mut HttpContext<Self>) -> Reply<Self>
fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext<Self>) -> Reply<Self>
{
Reply::with(req, httpcodes::HTTPOk)
}

View File

@ -5,9 +5,10 @@ use std::collections::HashMap;
use route_recognizer::Router;
use task::Task;
use route::{Payload, RouteHandler};
use route::RouteHandler;
use router::Handler;
use resource::Resource;
use payload::Payload;
use httpmessage::HttpRequest;
@ -93,7 +94,7 @@ struct InnerApplication<S> {
impl<S: 'static> Handler for InnerApplication<S> {
fn handle(&self, req: HttpRequest, payload: Option<Payload>) -> Task {
fn handle(&self, req: HttpRequest, payload: Payload) -> Task {
if let Ok(h) = self.router.recognize(req.path()) {
h.handler.handle(req.with_params(h.params), payload, Rc::clone(&self.state))
} else {

View File

@ -4,7 +4,8 @@ use std::rc::Rc;
use http::StatusCode;
use task::Task;
use route::{Payload, RouteHandler};
use route::RouteHandler;
use payload::Payload;
use httpmessage::{Body, HttpRequest, HttpResponse, IntoHttpResponse};
pub const HTTPOk: StaticResponse = StaticResponse(StatusCode::OK);
@ -25,8 +26,7 @@ impl StaticResponse {
}
impl<S> RouteHandler<S> for StaticResponse {
fn handle(&self, req: HttpRequest, _: Option<Payload>, _: Rc<S>) -> Task
{
fn handle(&self, req: HttpRequest, _: Payload, _: Rc<S>) -> Task {
Task::reply(HttpResponse::new(req, self.0, Body::Empty))
}
}

View File

@ -26,6 +26,7 @@ mod error;
mod date;
mod decode;
mod httpmessage;
mod payload;
mod resource;
mod route;
mod router;
@ -40,9 +41,10 @@ mod wsproto;
pub mod httpcodes;
pub use application::Application;
pub use httpmessage::{HttpRequest, HttpResponse, IntoHttpResponse};
pub use payload::{Payload, PayloadItem};
pub use router::RoutingMap;
pub use resource::{Reply, Resource};
pub use route::{Route, RouteFactory, RouteHandler, Payload, PayloadItem};
pub use route::{Route, RouteFactory, RouteHandler};
pub use server::HttpServer;
pub use context::HttpContext;
pub use route_recognizer::Params;

View File

@ -19,12 +19,9 @@ impl Actor for MyRoute {
impl Route for MyRoute {
type State = ();
fn request(req: HttpRequest,
payload: Option<Payload>,
ctx: &mut HttpContext<Self>) -> Reply<Self>
{
if let Some(pl) = payload {
ctx.add_stream(pl);
fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext<Self>) -> Reply<Self> {
if !payload.eof() {
ctx.add_stream(payload);
Reply::stream(MyRoute{req: Some(req)})
} else {
Reply::with(req, httpcodes::HTTPOk)
@ -37,7 +34,7 @@ impl ResponseType<PayloadItem> for MyRoute {
type Error = ();
}
impl StreamHandler<PayloadItem, ()> for MyRoute {}
impl StreamHandler<PayloadItem> for MyRoute {}
impl Handler<PayloadItem> for MyRoute {
fn handle(&mut self, msg: PayloadItem, ctx: &mut HttpContext<Self>)
@ -48,7 +45,6 @@ impl Handler<PayloadItem> for MyRoute {
ctx.start(httpcodes::HTTPOk.response(req));
ctx.write_eof();
}
Self::empty()
}
}
@ -62,22 +58,15 @@ impl Actor for MyWS {
impl Route for MyWS {
type State = ();
fn request(req: HttpRequest,
payload: Option<Payload>,
ctx: &mut HttpContext<Self>) -> Reply<Self>
{
if let Some(payload) = payload {
match ws::handshake(req) {
Ok(resp) => {
ctx.start(resp);
ctx.add_stream(ws::WsStream::new(payload));
Reply::stream(MyWS{})
},
Err(err) =>
Reply::reply(err)
}
} else {
Reply::with(req, httpcodes::HTTPBadRequest)
fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext<Self>) -> Reply<Self> {
match ws::handshake(req) {
Ok(resp) => {
ctx.start(resp);
ctx.add_stream(ws::WsStream::new(payload));
Reply::stream(MyWS{})
},
Err(err) =>
Reply::reply(err)
}
}
}

180
src/payload.rs Normal file
View File

@ -0,0 +1,180 @@
use std::rc::{Rc, Weak};
use std::cell::RefCell;
use std::collections::VecDeque;
use bytes::Bytes;
use futures::{Async, Poll, Stream};
use futures::task::{Task, current as current_task};
pub type PayloadItem = Bytes;
const MAX_PAYLOAD_SIZE: usize = 65_536; // max buffer size 64k
/// Stream of byte chunks
///
/// Payload stores chunks in vector. First chunk can be received with `.readany()` method.
pub struct Payload {
inner: Rc<RefCell<Inner>>,
}
impl Payload {
pub(crate) fn new(eof: bool) -> (PayloadSender, Payload) {
let shared = Rc::new(RefCell::new(Inner::new(eof)));
(PayloadSender{inner: Rc::downgrade(&shared)},
Payload{inner: shared})
}
/// Indicates paused state of the payload. If payload data is not consumed
/// it get paused. Max size of not consumed data is 64k
pub fn paused(&self) -> bool {
self.inner.borrow().paused()
}
/// Indicates EOF of payload
pub fn eof(&self) -> bool {
self.inner.borrow().eof()
}
/// Length of the data in this payload
pub fn len(&self) -> usize {
self.inner.borrow().len()
}
/// Is payload empty
pub fn is_empty(&self) -> bool {
self.inner.borrow().len() == 0
}
/// Get any chunk of data
pub fn readany(&mut self) -> Async<Option<PayloadItem>> {
self.inner.borrow_mut().readany()
}
/// Put unused data back to payload
pub fn unread_data(&mut self, data: PayloadItem) {
self.inner.borrow_mut().unread_data(data);
}
}
impl Stream for Payload {
type Item = PayloadItem;
type Error = ();
fn poll(&mut self) -> Poll<Option<PayloadItem>, ()> {
Ok(self.readany())
}
}
pub(crate) struct PayloadSender {
inner: Weak<RefCell<Inner>>,
}
impl PayloadSender {
pub(crate) fn feed_eof(&mut self) {
if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().feed_eof()
}
}
pub(crate) fn feed_data(&mut self, data: Bytes) {
if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().feed_data(data)
}
}
pub(crate) fn maybe_paused(&self) -> bool {
match self.inner.upgrade() {
Some(shared) => {
let inner = shared.borrow();
if inner.paused() && inner.len() < MAX_PAYLOAD_SIZE {
drop(inner);
shared.borrow_mut().resume();
false
} else if !inner.paused() && inner.len() > MAX_PAYLOAD_SIZE {
drop(inner);
shared.borrow_mut().pause();
true
} else {
inner.paused()
}
}
None => false,
}
}
}
struct Inner {
len: usize,
eof: bool,
paused: bool,
task: Option<Task>,
items: VecDeque<Bytes>,
}
impl Inner {
fn new(eof: bool) -> Self {
Inner {
len: 0,
eof: eof,
paused: false,
task: None,
items: VecDeque::new(),
}
}
fn paused(&self) -> bool {
self.paused
}
fn pause(&mut self) {
self.paused = true;
}
fn resume(&mut self) {
self.paused = false;
}
fn feed_eof(&mut self) {
self.eof = true;
if let Some(task) = self.task.take() {
task.notify()
}
}
fn feed_data(&mut self, data: Bytes) {
self.len += data.len();
self.items.push_back(data);
if let Some(task) = self.task.take() {
task.notify()
}
}
fn eof(&self) -> bool {
self.eof
}
fn len(&self) -> usize {
self.len
}
fn readany(&mut self) -> Async<Option<Bytes>> {
if let Some(data) = self.items.pop_front() {
self.len -= data.len();
Async::Ready(Some(data))
} else if self.eof {
Async::Ready(None)
} else {
self.task = Some(current_task());
Async::NotReady
}
}
pub fn unread_data(&mut self, data: Bytes) {
self.len += data.len();
self.items.push_front(data)
}
}

View File

@ -3,26 +3,23 @@ use std::{self, fmt, io, ptr};
use httparse;
use http::{Method, Version, Uri, HttpTryFrom};
use bytes::{Bytes, BytesMut, BufMut};
use futures::{Async, AsyncSink, Poll, Sink};
use futures::unsync::mpsc::{channel, Sender};
use futures::{Async, Poll};
use tokio_io::AsyncRead;
use hyper::header::{Headers, ContentLength};
use {Payload, PayloadItem};
use error::{Error, Result};
use decode::Decoder;
use payload::{Payload, PayloadSender};
use httpmessage::{Message, HttpRequest};
const MAX_HEADERS: usize = 100;
const INIT_BUFFER_SIZE: usize = 8192;
pub const MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
const MAX_BUFFER_SIZE: usize = 131_072;
struct PayloadInfo {
tx: Sender<PayloadItem>,
tx: PayloadSender,
decoder: Decoder,
tmp_item: Option<PayloadItem>,
}
pub struct Reader {
@ -61,48 +58,17 @@ impl Reader {
fn decode(&mut self) -> std::result::Result<Decoding, Error>
{
if let Some(ref mut payload) = self.payload {
if payload.tx.maybe_paused() {
return Ok(Decoding::Paused)
}
loop {
if let Some(item) = payload.tmp_item.take() {
let eof = item.is_eof();
match payload.tx.start_send(item) {
Ok(AsyncSink::NotReady(item)) => {
payload.tmp_item = Some(item);
return Ok(Decoding::Paused)
}
Ok(AsyncSink::Ready) => {
if eof {
return Ok(Decoding::Ready)
}
},
Err(_) => return Err(Error::Incomplete),
}
}
match payload.decoder.decode(&mut self.read_buf) {
Ok(Async::Ready(Some(bytes))) => {
match payload.tx.start_send(PayloadItem::Chunk(bytes)) {
Ok(AsyncSink::NotReady(item)) => {
payload.tmp_item = Some(item);
return Ok(Decoding::Paused)
}
Ok(AsyncSink::Ready) => {
continue
}
Err(_) => return Err(Error::Incomplete),
}
payload.tx.feed_data(bytes)
},
Ok(Async::Ready(None)) => {
match payload.tx.start_send(PayloadItem::Eof) {
Ok(AsyncSink::NotReady(item)) => {
payload.tmp_item = Some(item);
return Ok(Decoding::Paused)
}
Ok(AsyncSink::Ready) => {
return Ok(Decoding::Ready)
}
Err(_) => return Err(Error::Incomplete),
}
payload.tx.feed_eof();
return Ok(Decoding::Ready)
},
Ok(Async::NotReady) => return Ok(Decoding::NotReady),
Err(_) => return Err(Error::Incomplete),
@ -113,9 +79,11 @@ impl Reader {
}
}
pub fn parse<T>(&mut self, io: &mut T) -> Poll<(HttpRequest, Option<Payload>), Error>
pub fn parse<T>(&mut self, io: &mut T) -> Poll<(HttpRequest, Payload), Error>
where T: AsyncRead
{
loop {
match self.decode()? {
Decoding::Paused => return Ok(Async::NotReady),
@ -137,11 +105,10 @@ impl Reader {
match try!(parse(&mut self.read_buf)) {
Some((msg, decoder)) => {
let payload = if let Some(decoder) = decoder {
let (tx, rx) = channel(32);
let (tx, rx) = Payload::new(false);
let payload = PayloadInfo {
tx: tx,
decoder: decoder,
tmp_item: None,
};
self.payload = Some(payload);
@ -170,9 +137,10 @@ impl Reader {
}
}
}
Some(rx)
rx
} else {
None
let (_, rx) = Payload::new(true);
rx
};
return Ok(Async::Ready((msg, payload)));
},

View File

@ -6,7 +6,8 @@ use actix::Actor;
use http::Method;
use task::Task;
use route::{Route, Payload, RouteHandler};
use route::{Route, RouteHandler};
use payload::Payload;
use context::HttpContext;
use httpcodes::HTTPMethodNotAllowed;
use httpmessage::{HttpRequest, HttpResponse, IntoHttpResponse};
@ -92,7 +93,7 @@ impl<S> Resource<S> where S: 'static {
impl<S: 'static> RouteHandler<S> for Resource<S> {
fn handle(&self, req: HttpRequest, payload: Option<Payload>, state: Rc<S>) -> Task {
fn handle(&self, req: HttpRequest, payload: Payload, state: Rc<S>) -> Task {
if let Some(handler) = self.routes.get(req.method()) {
handler.handle(req, payload, state)
} else {

View File

@ -3,40 +3,13 @@ use std::marker::PhantomData;
use actix::Actor;
use bytes::Bytes;
use futures::unsync::mpsc::Receiver;
use task::Task;
use context::HttpContext;
use resource::Reply;
use payload::Payload;
use httpmessage::{HttpRequest, HttpResponse};
/// Stream of `PayloadItem`'s
pub type Payload = Receiver<PayloadItem>;
/// `PayloadItem` represents one payload item
#[derive(Debug)]
pub enum PayloadItem {
/// Indicates end of payload stream
Eof,
/// Chunk of bytes
Chunk(Bytes)
}
impl PayloadItem {
/// Is item an eof
pub fn is_eof(&self) -> bool {
match *self {
PayloadItem::Eof => true,
_ => false,
}
}
/// Is item a chunk
pub fn is_chunk(&self) -> bool {
!self.is_eof()
}
}
#[doc(hidden)]
#[derive(Debug)]
#[cfg_attr(feature="cargo-clippy", allow(large_enum_variant))]
@ -47,7 +20,7 @@ pub enum Frame {
/// Trait defines object that could be regestered as resource route
pub trait RouteHandler<S>: 'static {
fn handle(&self, req: HttpRequest, payload: Option<Payload>, state: Rc<S>) -> Task;
fn handle(&self, req: HttpRequest, payload: Payload, state: Rc<S>) -> Task;
}
/// Actors with ability to handle http requests
@ -60,9 +33,7 @@ pub trait Route: Actor<Context=HttpContext<Self>> {
/// result immediately with `Reply::reply` or `Reply::with`.
/// Actor itself could be returned for handling streaming request/response.
/// In that case `HttpContext::start` and `HttpContext::write` has to be used.
fn request(req: HttpRequest,
payload: Option<Payload>,
ctx: &mut HttpContext<Self>) -> Reply<Self>;
fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext<Self>) -> Reply<Self>;
/// This method creates `RouteFactory` for this actor.
fn factory() -> RouteFactory<Self, Self::State> {
@ -77,7 +48,7 @@ impl<A, S> RouteHandler<S> for RouteFactory<A, S>
where A: Route<State=S>,
S: 'static
{
fn handle(&self, req: HttpRequest, payload: Option<Payload>, state: Rc<A::State>) -> Task
fn handle(&self, req: HttpRequest, payload: Payload, state: Rc<A::State>) -> Task
{
let mut ctx = HttpContext::new(state);
A::request(req, payload, &mut ctx).into(ctx)

View File

@ -4,14 +4,15 @@ use std::collections::HashMap;
use route_recognizer::{Router as Recognizer};
use task::Task;
use route::{Payload, RouteHandler};
use payload::Payload;
use route::RouteHandler;
use resource::Resource;
use application::Application;
use httpcodes::HTTPNotFound;
use httpmessage::{HttpRequest, IntoHttpResponse};
pub(crate) trait Handler: 'static {
fn handle(&self, req: HttpRequest, payload: Option<Payload>) -> Task;
fn handle(&self, req: HttpRequest, payload: Payload) -> Task;
}
/// Request routing map
@ -127,7 +128,7 @@ struct Router {
impl Router {
pub fn call(&self, req: HttpRequest, payload: Option<Payload>) -> Task
pub fn call(&self, req: HttpRequest, payload: Payload) -> Task
{
if let Ok(h) = self.resources.recognize(req.path()) {
h.handler.handle(req.with_params(h.params), payload, Rc::new(()))

View File

@ -18,8 +18,7 @@ use httpmessage::{Body, HttpResponse};
type FrameStream = Stream<Item=Frame, Error=io::Error>;
const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific
const DEFAULT_LIMIT: usize = 65_536; // max buffer size 64k
const MAX_WRITE_BUFFER_SIZE: usize = 65_536; // max buffer size 64k
#[derive(PartialEq, Debug)]
enum TaskRunningState {
@ -239,7 +238,7 @@ impl Task {
// should pause task
if self.state != TaskRunningState::Done {
if self.buffer.len() > DEFAULT_LIMIT {
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
self.state = TaskRunningState::Paused;
} else if self.state == TaskRunningState::Paused {
self.state = TaskRunningState::Running;

View File

@ -21,25 +21,20 @@
//! impl Route for WsRoute {
//! type State = ();
//!
//! fn request(req: HttpRequest, payload: Option<Payload>,
//! ctx: &mut HttpContext<Self>) -> Reply<Self>
//! fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext<Self>) -> Reply<Self>
//! {
//! if let Some(payload) = payload {
//! // WebSocket handshake
//! match ws::handshake(req) {
//! Ok(resp) => {
//! // Send handshake response to peer
//! ctx.start(resp);
//! // Map Payload into WsStream
//! ctx.add_stream(ws::WsStream::new(payload));
//! // Start ws messages processing
//! Reply::stream(WsRoute)
//! },
//! Err(err) =>
//! Reply::reply(err)
//! }
//! } else {
//! Reply::with(req, httpcodes::HTTPBadRequest)
//! // WebSocket handshake
//! match ws::handshake(req) {
//! Ok(resp) => {
//! // Send handshake response to peer
//! ctx.start(resp);
//! // Map Payload into WsStream
//! ctx.add_stream(ws::WsStream::new(payload));
//! // Start ws messages processing
//! Reply::stream(WsRoute)
//! },
//! Err(err) =>
//! Reply::reply(err)
//! }
//! }
//! }
@ -77,7 +72,8 @@ use hyper::header;
use actix::Actor;
use context::HttpContext;
use route::{Route, Payload, PayloadItem};
use route::Route;
use payload::Payload;
use httpcodes::{HTTPBadRequest, HTTPMethodNotAllowed};
use httpmessage::{Body, ConnectionType, HttpRequest, HttpResponse, IntoHttpResponse};
@ -204,21 +200,18 @@ impl Stream for WsStream {
let mut done = false;
loop {
match self.rx.poll() {
Ok(Async::Ready(Some(item))) => {
match item {
PayloadItem::Eof =>
return Ok(Async::Ready(None)),
PayloadItem::Chunk(chunk) => {
self.buf.extend(chunk)
}
}
match self.rx.readany() {
Async::Ready(Some(chunk)) => {
self.buf.extend(chunk)
}
Ok(Async::Ready(None)) => done = true,
Ok(Async::NotReady) => {},
Err(err) => return Err(err),
Async::Ready(None) => {
done = true;
}
Async::NotReady => break,
}
}
loop {
match wsframe::Frame::parse(&mut self.buf) {
Ok(Some(frame)) => {
trace!("Frame {}", frame);