1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-30 18:34:36 +01:00

refactor pipeline

This commit is contained in:
Nikolay Kim 2017-12-01 15:45:15 -08:00
parent 9a1ba527c0
commit 47645626c4
7 changed files with 745 additions and 762 deletions

View File

@ -12,12 +12,16 @@ use actix::fut::ActorFuture;
use actix::dev::{AsyncContextApi, ActorAddressCell, ActorItemsCell, ActorWaitCell, SpawnHandle, use actix::dev::{AsyncContextApi, ActorAddressCell, ActorItemsCell, ActorWaitCell, SpawnHandle,
Envelope, ToEnvelope, RemoteEnvelope}; Envelope, ToEnvelope, RemoteEnvelope};
use task::{IoContext, DrainFut};
use body::{Body, Binary}; use body::{Body, Binary};
use error::Error; use error::Error;
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use pipeline::DrainFut;
pub(crate) trait IoContext: 'static {
fn disconnected(&mut self);
fn poll(&mut self) -> Poll<Option<Frame>, Error>;
}
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum Frame { pub(crate) enum Frame {
@ -45,6 +49,7 @@ impl<A, S> ActorContext for HttpContext<A, S> where A: Actor<Context=Self>
{ {
/// Stop actor execution /// Stop actor execution
fn stop(&mut self) { fn stop(&mut self) {
self.stream.push_back(Frame::Payload(None));
self.items.stop(); self.items.stop();
self.address.close(); self.address.close();
if self.state == ActorState::Running { if self.state == ActorState::Running {
@ -150,6 +155,11 @@ impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
} }
} }
/// Indicate end of streamimng payload. Also this method calls `Self::close`.
pub fn write_eof(&mut self) {
self.stop();
}
/// Returns drain future /// Returns drain future
pub fn drain(&mut self) -> Drain<A> { pub fn drain(&mut self) -> Drain<A> {
let fut = Rc::new(RefCell::new(DrainFut::default())); let fut = Rc::new(RefCell::new(DrainFut::default()));

View File

@ -54,7 +54,7 @@ pub(crate) struct Http1<T: AsyncWrite + 'static, H: 'static> {
} }
struct Entry { struct Entry {
task: Pipeline, pipe: Pipeline,
eof: bool, eof: bool,
error: bool, error: bool,
finished: bool, finished: bool,
@ -108,7 +108,7 @@ impl<T, H> Http1<T, H>
return Err(()) return Err(())
} }
match item.task.poll_io(&mut self.stream) { match item.pipe.poll_io(&mut self.stream) {
Ok(Async::Ready(ready)) => { Ok(Async::Ready(ready)) => {
not_ready = false; not_ready = false;
@ -129,13 +129,13 @@ impl<T, H> Http1<T, H>
}, },
Err(err) => { Err(err) => {
// it is not possible to recover from error // it is not possible to recover from error
// during task handling, so just drop connection // during pipe handling, so just drop connection
error!("Unhandled error: {}", err); error!("Unhandled error: {}", err);
return Err(()) return Err(())
} }
} }
} else if !item.finished { } else if !item.finished {
match item.task.poll() { match item.pipe.poll() {
Ok(Async::NotReady) => (), Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) => { Ok(Async::Ready(_)) => {
not_ready = false; not_ready = false;
@ -181,11 +181,11 @@ impl<T, H> Http1<T, H>
self.keepalive_timer.take(); self.keepalive_timer.take();
// start request processing // start request processing
let mut task = None; let mut pipe = None;
for h in self.router.iter() { for h in self.router.iter() {
req = match h.handle(req) { req = match h.handle(req) {
Ok(t) => { Ok(t) => {
task = Some(t); pipe = Some(t);
break break
}, },
Err(req) => req, Err(req) => req,
@ -193,7 +193,7 @@ impl<T, H> Http1<T, H>
} }
self.tasks.push_back( self.tasks.push_back(
Entry {task: task.unwrap_or_else(|| Pipeline::error(HTTPNotFound)), Entry {pipe: pipe.unwrap_or_else(|| Pipeline::error(HTTPNotFound)),
eof: false, eof: false,
error: false, error: false,
finished: false}); finished: false});
@ -206,7 +206,7 @@ impl<T, H> Http1<T, H>
self.error = true; self.error = true;
self.stream.disconnected(); self.stream.disconnected();
for entry in &mut self.tasks { for entry in &mut self.tasks {
entry.task.disconnected() entry.pipe.disconnected()
} }
}, },
Err(err) => { Err(err) => {
@ -214,7 +214,7 @@ impl<T, H> Http1<T, H>
not_ready = false; not_ready = false;
self.stream.disconnected(); self.stream.disconnected();
for entry in &mut self.tasks { for entry in &mut self.tasks {
entry.task.disconnected() entry.pipe.disconnected()
} }
// kill keepalive // kill keepalive
@ -227,7 +227,7 @@ impl<T, H> Http1<T, H>
if self.tasks.is_empty() { if self.tasks.is_empty() {
if let ReaderError::Error(err) = err { if let ReaderError::Error(err) = err {
self.tasks.push_back( self.tasks.push_back(
Entry {task: Pipeline::error(err.error_response()), Entry {pipe: Pipeline::error(err.error_response()),
eof: false, eof: false,
error: false, error: false,
finished: false}); finished: false});

View File

@ -57,7 +57,7 @@ mod payload;
mod resource; mod resource;
mod recognizer; mod recognizer;
mod route; mod route;
mod task; //mod task;
mod pipeline; mod pipeline;
mod staticfiles; mod staticfiles;
mod server; mod server;

View File

@ -706,7 +706,6 @@ mod tests {
"abbc761f78ff4d7cb7573b5a23f96ef0".to_owned(), payload); "abbc761f78ff4d7cb7573b5a23f96ef0".to_owned(), payload);
match multipart.poll() { match multipart.poll() {
Ok(Async::Ready(Some(item))) => { Ok(Async::Ready(Some(item))) => {
println!("{:?}", item);
match item { match item {
MultipartItem::Field(mut field) => { MultipartItem::Field(mut field) => {
assert_eq!(field.content_type().type_(), mime::TEXT); assert_eq!(field.content_type().type_(), mime::TEXT);

File diff suppressed because it is too large Load Diff

View File

@ -5,8 +5,7 @@ use actix::Actor;
use futures::Future; use futures::Future;
use error::Error; use error::Error;
use task::IoContext; use context::{HttpContext, IoContext};
use context::HttpContext;
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;

View File

@ -1,483 +0,0 @@
use std::{fmt, mem};
use std::rc::Rc;
use std::cell::RefCell;
use futures::{Async, Future, Poll};
use futures::task::{Task as FutureTask, current as current_task};
use route::{Reply, ReplyItem};
use body::{Body, BodyStream, Binary};
use context::Frame;
use h1writer::{Writer, WriterState};
use error::{Error, UnexpectedTaskFrame};
use pipeline::MiddlewaresResponse;
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
#[derive(PartialEq, Debug)]
enum TaskRunningState {
Paused,
Running,
Done,
}
impl TaskRunningState {
fn is_done(&self) -> bool {
*self == TaskRunningState::Done
}
fn pause(&mut self) {
if *self != TaskRunningState::Done {
*self = TaskRunningState::Paused
}
}
fn resume(&mut self) {
if *self != TaskRunningState::Done {
*self = TaskRunningState::Running
}
}
}
enum ResponseState {
Reading,
Ready(HttpResponse),
Middlewares(MiddlewaresResponse),
Prepared(Option<HttpResponse>),
}
enum IOState {
Response,
Payload(BodyStream),
Context,
Done,
}
enum TaskStream {
None,
Context(Box<IoContext>),
Response(Box<Future<Item=HttpResponse, Error=Error>>),
}
impl IOState {
fn is_done(&self) -> bool {
match *self {
IOState::Done => true,
_ => false
}
}
}
impl ResponseState {
fn is_reading(&self) -> bool {
match *self {
ResponseState::Reading => true,
_ => false
}
}
}
impl fmt::Debug for ResponseState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
ResponseState::Reading => write!(f, "ResponseState::Reading"),
ResponseState::Ready(_) => write!(f, "ResponseState::Ready"),
ResponseState::Middlewares(_) => write!(f, "ResponseState::Middlewares"),
ResponseState::Prepared(_) => write!(f, "ResponseState::Prepared"),
}
}
}
impl fmt::Debug for IOState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
IOState::Response => write!(f, "IOState::Response"),
IOState::Payload(_) => write!(f, "IOState::Payload"),
IOState::Context => write!(f, "IOState::Context"),
IOState::Done => write!(f, "IOState::Done"),
}
}
}
pub(crate) trait IoContext: 'static {
fn disconnected(&mut self);
fn poll(&mut self) -> Poll<Option<Frame>, Error>;
}
/// Future that resolves when all buffered data get sent
#[doc(hidden)]
#[derive(Debug)]
pub struct DrainFut {
drained: bool,
task: Option<FutureTask>,
}
impl Default for DrainFut {
fn default() -> DrainFut {
DrainFut {
drained: false,
task: None,
}
}
}
impl DrainFut {
fn set(&mut self) {
self.drained = true;
if let Some(task) = self.task.take() {
task.notify()
}
}
}
impl Future for DrainFut {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
if self.drained {
Ok(Async::Ready(()))
} else {
self.task = Some(current_task());
Ok(Async::NotReady)
}
}
}
pub(crate) struct Task {
running: TaskRunningState,
response: ResponseState,
iostate: IOState,
stream: TaskStream,
drain: Vec<Rc<RefCell<DrainFut>>>,
middlewares: Option<MiddlewaresResponse>,
}
impl Task {
pub(crate) fn new(reply: Reply) -> Task {
match reply.into() {
ReplyItem::Message(msg) => {
Task::from_response(msg)
},
ReplyItem::Actor(ctx) => {
Task { running: TaskRunningState::Running,
response: ResponseState::Reading,
iostate: IOState::Response,
drain: Vec::new(),
stream: TaskStream::Context(ctx),
middlewares: None }
}
ReplyItem::Future(fut) => {
Task { running: TaskRunningState::Running,
response: ResponseState::Reading,
iostate: IOState::Response,
drain: Vec::new(),
stream: TaskStream::Response(fut),
middlewares: None }
}
}
}
pub(crate) fn from_response<R: Into<HttpResponse>>(response: R) -> Task {
Task { running: TaskRunningState::Running,
response: ResponseState::Ready(response.into()),
iostate: IOState::Response,
drain: Vec::new(),
stream: TaskStream::None,
middlewares: None }
}
pub(crate) fn from_error<E: Into<Error>>(err: E) -> Task {
Task::from_response(err.into())
}
pub(crate) fn response(&mut self) -> HttpResponse {
match self.response {
ResponseState::Prepared(ref mut state) => state.take().unwrap(),
_ => panic!("Internal state is broken"),
}
}
pub(crate) fn set_middlewares(&mut self, middlewares: MiddlewaresResponse) {
self.middlewares = Some(middlewares)
}
pub(crate) fn disconnected(&mut self) {
if let TaskStream::Context(ref mut ctx) = self.stream {
ctx.disconnected();
}
}
pub(crate) fn poll_io<T>(&mut self, io: &mut T, req: &mut HttpRequest) -> Poll<bool, Error>
where T: Writer
{
trace!("POLL-IO frames resp: {:?}, io: {:?}, running: {:?}",
self.response, self.iostate, self.running);
if self.iostate.is_done() { // response is completed
return Ok(Async::Ready(self.running.is_done()));
} else if self.drain.is_empty() && self.running != TaskRunningState::Paused {
// if task is paused, write buffer is probably full
loop {
let result = match mem::replace(&mut self.iostate, IOState::Done) {
IOState::Response => {
match self.poll_response(req) {
Ok(Async::Ready(mut resp)) => {
let result = io.start(req, &mut resp)?;
match resp.replace_body(Body::Empty) {
Body::Streaming(stream) | Body::Upgrade(stream) =>
self.iostate = IOState::Payload(stream),
Body::StreamingContext | Body::UpgradeContext =>
self.iostate = IOState::Context,
_ => (),
}
self.response = ResponseState::Prepared(Some(resp));
result
},
Ok(Async::NotReady) => {
self.iostate = IOState::Response;
return Ok(Async::NotReady)
}
Err(err) => {
let mut resp = err.into();
let result = io.start(req, &mut resp)?;
match resp.replace_body(Body::Empty) {
Body::Streaming(stream) | Body::Upgrade(stream) =>
self.iostate = IOState::Payload(stream),
_ => (),
}
self.response = ResponseState::Prepared(Some(resp));
result
}
}
},
IOState::Payload(mut body) => {
// always poll stream
if self.running == TaskRunningState::Running {
match self.poll()? {
Async::Ready(_) =>
self.running = TaskRunningState::Done,
Async::NotReady => (),
}
}
match body.poll() {
Ok(Async::Ready(None)) => {
self.iostate = IOState::Done;
io.write_eof()?;
break
},
Ok(Async::Ready(Some(chunk))) => {
self.iostate = IOState::Payload(body);
io.write(chunk.as_ref())?
}
Ok(Async::NotReady) => {
self.iostate = IOState::Payload(body);
break
},
Err(err) => return Err(err),
}
}
IOState::Context => {
match self.poll_context() {
Ok(Async::Ready(None)) => {
self.iostate = IOState::Done;
self.running = TaskRunningState::Done;
io.write_eof()?;
break
},
Ok(Async::Ready(Some(chunk))) => {
self.iostate = IOState::Context;
io.write(chunk.as_ref())?
}
Ok(Async::NotReady) => {
self.iostate = IOState::Context;
break
}
Err(err) => return Err(err),
}
}
IOState::Done => break,
};
match result {
WriterState::Pause => {
self.running.pause();
break
}
WriterState::Done =>
self.running.resume(),
}
}
}
// flush io
match io.poll_complete() {
Ok(Async::Ready(_)) => self.running.resume(),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => {
debug!("Error sending data: {}", err);
return Err(err.into())
}
}
// drain futures
if !self.drain.is_empty() {
for fut in &mut self.drain {
fut.borrow_mut().set()
}
self.drain.clear();
}
// response is completed
if self.iostate.is_done() {
if let ResponseState::Prepared(Some(ref mut resp)) = self.response {
resp.set_response_size(io.written())
}
Ok(Async::Ready(self.running.is_done()))
} else {
Ok(Async::NotReady)
}
}
pub(crate) fn poll_response(&mut self, req: &mut HttpRequest) -> Poll<HttpResponse, Error> {
loop {
let state = mem::replace(&mut self.response, ResponseState::Prepared(None));
match state {
ResponseState::Ready(response) => {
// run middlewares
if let Some(mut middlewares) = self.middlewares.take() {
match middlewares.response(req, response) {
Ok(Some(response)) =>
return Ok(Async::Ready(response)),
Ok(None) => {
// middlewares need to run some futures
self.response = ResponseState::Middlewares(middlewares);
continue
}
Err(err) => return Err(err),
}
} else {
return Ok(Async::Ready(response))
}
}
ResponseState::Middlewares(mut middlewares) => {
// process middlewares
match middlewares.poll(req) {
Ok(Async::NotReady) => {
self.response = ResponseState::Middlewares(middlewares);
return Ok(Async::NotReady)
},
Ok(Async::Ready(response)) =>
return Ok(Async::Ready(response)),
Err(err) =>
return Err(err),
}
}
_ => (),
}
self.response = state;
match mem::replace(&mut self.stream, TaskStream::None) {
TaskStream::None =>
return Ok(Async::NotReady),
TaskStream::Context(mut context) => {
loop {
match context.poll() {
Ok(Async::Ready(Some(frame))) => {
match frame {
Frame::Message(msg) => {
if !self.response.is_reading() {
error!("Unexpected message frame {:?}", msg);
return Err(UnexpectedTaskFrame.into())
}
self.stream = TaskStream::Context(context);
self.response = ResponseState::Ready(msg);
break
},
Frame::Payload(_) | Frame::Drain(_) => (),
}
},
Ok(Async::Ready(None)) => {
error!("Unexpected eof");
return Err(UnexpectedTaskFrame.into())
},
Ok(Async::NotReady) => {
self.stream = TaskStream::Context(context);
return Ok(Async::NotReady)
},
Err(err) =>
return Err(err),
}
}
},
TaskStream::Response(mut fut) => {
match fut.poll() {
Ok(Async::NotReady) => {
self.stream = TaskStream::Response(fut);
return Ok(Async::NotReady);
},
Ok(Async::Ready(response)) => {
self.response = ResponseState::Ready(response);
}
Err(err) =>
return Err(err)
}
}
}
}
}
pub(crate) fn poll(&mut self) -> Poll<(), Error> {
match self.stream {
TaskStream::None | TaskStream::Response(_) =>
Ok(Async::Ready(())),
TaskStream::Context(ref mut context) => {
loop {
match context.poll() {
Ok(Async::Ready(Some(_))) => (),
Ok(Async::Ready(None)) =>
return Ok(Async::Ready(())),
Ok(Async::NotReady) =>
return Ok(Async::NotReady),
Err(err) =>
return Err(err),
}
}
},
}
}
fn poll_context(&mut self) -> Poll<Option<Binary>, Error> {
match self.stream {
TaskStream::None | TaskStream::Response(_) =>
Err(UnexpectedTaskFrame.into()),
TaskStream::Context(ref mut context) => {
match context.poll() {
Ok(Async::Ready(Some(frame))) => {
match frame {
Frame::Message(msg) => {
error!("Unexpected message frame {:?}", msg);
Err(UnexpectedTaskFrame.into())
},
Frame::Payload(payload) => {
Ok(Async::Ready(payload))
},
Frame::Drain(fut) => {
self.drain.push(fut);
Ok(Async::NotReady)
}
}
},
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Err(err),
}
},
}
}
}