1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-22 23:05:56 +01:00

refactor streaming responses

This commit is contained in:
Nikolay Kim 2017-11-30 14:42:20 -08:00
parent a0bca2d4cf
commit 6e138bf373
14 changed files with 436 additions and 295 deletions

View File

@ -5,6 +5,8 @@
* HTTP/2 Support * HTTP/2 Support
* Refactor streaming responses
* Refactor error handling * Refactor error handling
* Asynchronous middlewares * Asynchronous middlewares

View File

@ -8,7 +8,7 @@ extern crate futures;
use actix_web::*; use actix_web::*;
use actix_web::middlewares::RequestSession; use actix_web::middlewares::RequestSession;
use futures::stream::{once, Once}; use futures::future::{FutureResult, result};
/// simple handler /// simple handler
fn index(mut req: HttpRequest) -> Result<HttpResponse> { fn index(mut req: HttpRequest) -> Result<HttpResponse> {
@ -31,15 +31,14 @@ fn index(mut req: HttpRequest) -> Result<HttpResponse> {
} }
/// async handler /// async handler
fn index_async(req: HttpRequest) -> Once<actix_web::Frame, Error> fn index_async(req: HttpRequest) -> FutureResult<HttpResponse, Error>
{ {
println!("{:?}", req); println!("{:?}", req);
once(Ok(HttpResponse::Ok() result(HttpResponse::Ok()
.content_type("text/html") .content_type("text/html")
.body(format!("Hello {}!", req.match_info().get("name").unwrap())) .body(format!("Hello {}!", req.match_info().get("name").unwrap()))
.unwrap() .map_err(|e| e.into()))
.into()))
} }
/// handler with path parameters like `/user/{name}/` /// handler with path parameters like `/user/{name}/`

View File

@ -56,7 +56,7 @@ impl Handler<ws::Message> for MyWebSocket {
} }
fn main() { fn main() {
::std::env::set_var("RUST_LOG", "actix_web=info"); ::std::env::set_var("RUST_LOG", "actix_web=trace");
let _ = env_logger::init(); let _ = env_logger::init();
let sys = actix::System::new("ws-example"); let sys = actix::System::new("ws-example");

View File

@ -1,4 +1,4 @@
# Overview # [WIP] Overview
Actix web provides some primitives to build web servers and applications with Rust. Actix web provides some primitives to build web servers and applications with Rust.
It provides routing, middlewares, pre-processing of requests, and post-processing of responses, It provides routing, middlewares, pre-processing of requests, and post-processing of responses,
@ -69,7 +69,7 @@ fn main() {
} }
``` ```
## Handler ## [WIP] Handler
A request handler can have different forms. A request handler can have different forms.

View File

@ -1,24 +1,29 @@
use std::fmt;
use std::rc::Rc; use std::rc::Rc;
use std::sync::Arc; use std::sync::Arc;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures::Stream;
use route::Frame; use error::Error;
pub(crate) type BodyStream = Box<Stream<Item=Bytes, Error=Error>>;
/// Represents various types of http message body. /// Represents various types of http message body.
#[derive(Debug, PartialEq)]
pub enum Body { pub enum Body {
/// Empty response. `Content-Length` header is set to `0` /// Empty response. `Content-Length` header is set to `0`
Empty, Empty,
/// Specific response body. /// Specific response body.
Binary(Binary), Binary(Binary),
/// Streaming response body with specified length.
Length(u64),
/// Unspecified streaming response. Developer is responsible for setting /// Unspecified streaming response. Developer is responsible for setting
/// right `Content-Length` or `Transfer-Encoding` headers. /// right `Content-Length` or `Transfer-Encoding` headers.
Streaming, Streaming(BodyStream),
/// Upgrade connection. /// Upgrade connection.
Upgrade, Upgrade(BodyStream),
/// Special body type for actor streaming response.
StreamingContext,
/// Special body type for actor upgrade response.
UpgradeContext,
} }
/// Represents various types of binary body. /// Represents various types of binary body.
@ -45,7 +50,8 @@ impl Body {
/// Does this body streaming. /// Does this body streaming.
pub fn is_streaming(&self) -> bool { pub fn is_streaming(&self) -> bool {
match *self { match *self {
Body::Length(_) | Body::Streaming | Body::Upgrade => true, Body::Streaming(_) | Body::StreamingContext
| Body::Upgrade(_) | Body::UpgradeContext => true,
_ => false _ => false
} }
} }
@ -64,6 +70,43 @@ impl Body {
} }
} }
impl PartialEq for Body {
fn eq(&self, other: &Body) -> bool {
match *self {
Body::Empty => match *other {
Body::Empty => true,
_ => false,
},
Body::Binary(ref b) => match *other {
Body::Binary(ref b2) => b == b2,
_ => false,
},
Body::StreamingContext => match *other {
Body::StreamingContext => true,
_ => false,
},
Body::UpgradeContext => match *other {
Body::UpgradeContext => true,
_ => false,
},
Body::Streaming(_) | Body::Upgrade(_) => false,
}
}
}
impl fmt::Debug for Body {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Body::Empty => write!(f, "Body::Empty"),
Body::Binary(ref b) => write!(f, "Body::Binary({:?})", b),
Body::Streaming(_) => write!(f, "Body::Streaming(_)"),
Body::Upgrade(_) => write!(f, "Body::Upgrade(_)"),
Body::StreamingContext => write!(f, "Body::StreamingContext"),
Body::UpgradeContext => write!(f, "Body::UpgradeContext"),
}
}
}
impl<T> From<T> for Body where T: Into<Binary>{ impl<T> From<T> for Body where T: Into<Binary>{
fn from(b: T) -> Body { fn from(b: T) -> Body {
Body::Binary(b.into()) Body::Binary(b.into())
@ -195,12 +238,6 @@ impl AsRef<[u8]> for Binary {
} }
} }
impl From<Binary> for Frame {
fn from(b: Binary) -> Frame {
Frame::Payload(Some(b))
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@ -209,8 +246,7 @@ mod tests {
fn test_body_is_streaming() { fn test_body_is_streaming() {
assert_eq!(Body::Empty.is_streaming(), false); assert_eq!(Body::Empty.is_streaming(), false);
assert_eq!(Body::Binary(Binary::from("")).is_streaming(), false); assert_eq!(Body::Binary(Binary::from("")).is_streaming(), false);
assert_eq!(Body::Length(100).is_streaming(), true); // assert_eq!(Body::Streaming.is_streaming(), true);
assert_eq!(Body::Streaming.is_streaming(), true);
} }
#[test] #[test]

View File

@ -3,7 +3,7 @@ use std::rc::Rc;
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::marker::PhantomData; use std::marker::PhantomData;
use futures::{Async, Future, Stream, Poll}; use futures::{Async, Future, Poll};
use futures::sync::oneshot::Sender; use futures::sync::oneshot::Sender;
use actix::{Actor, ActorState, ActorContext, AsyncContext, use actix::{Actor, ActorState, ActorContext, AsyncContext,
@ -13,13 +13,19 @@ use actix::dev::{AsyncContextApi, ActorAddressCell, ActorItemsCell, ActorWaitCel
Envelope, ToEnvelope, RemoteEnvelope}; Envelope, ToEnvelope, RemoteEnvelope};
use task::{IoContext, DrainFut}; use task::{IoContext, DrainFut};
use body::Binary; use body::{Body, Binary};
use error::Error; use error::Error;
use route::Frame;
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
#[derive(Debug)]
pub(crate) enum Frame {
Message(HttpResponse),
Payload(Option<Binary>),
Drain(Rc<RefCell<DrainFut>>),
}
/// Http actor execution context /// Http actor execution context
pub struct HttpContext<A, S=()> where A: Actor<Context=HttpContext<A, S>>, pub struct HttpContext<A, S=()> where A: Actor<Context=HttpContext<A, S>>,
{ {
@ -31,25 +37,14 @@ pub struct HttpContext<A, S=()> where A: Actor<Context=HttpContext<A, S>>,
stream: VecDeque<Frame>, stream: VecDeque<Frame>,
wait: ActorWaitCell<A>, wait: ActorWaitCell<A>,
request: HttpRequest<S>, request: HttpRequest<S>,
streaming: bool,
disconnected: bool, disconnected: bool,
} }
impl<A, S> IoContext for HttpContext<A, S> where A: Actor<Context=Self>, S: 'static {
fn disconnected(&mut self) {
self.items.stop();
self.disconnected = true;
if self.state == ActorState::Running {
self.state = ActorState::Stopping;
}
}
}
impl<A, S> ActorContext for HttpContext<A, S> where A: Actor<Context=Self> impl<A, S> ActorContext for HttpContext<A, S> where A: Actor<Context=Self>
{ {
/// Stop actor execution /// Stop actor execution
fn stop(&mut self) { fn stop(&mut self) {
self.stream.push_back(Frame::Payload(None));
self.items.stop(); self.items.stop();
self.address.close(); self.address.close();
if self.state == ActorState::Running { if self.state == ActorState::Running {
@ -116,6 +111,7 @@ impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
wait: ActorWaitCell::default(), wait: ActorWaitCell::default(),
stream: VecDeque::new(), stream: VecDeque::new(),
request: req, request: req,
streaming: false,
disconnected: false, disconnected: false,
} }
} }
@ -133,20 +129,25 @@ impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
&mut self.request &mut self.request
} }
/// Send response to peer
/// Start response processing
pub fn start<R: Into<HttpResponse>>(&mut self, response: R) { pub fn start<R: Into<HttpResponse>>(&mut self, response: R) {
self.stream.push_back(Frame::Message(response.into())) let resp = response.into();
match *resp.body() {
Body::StreamingContext | Body::UpgradeContext => self.streaming = true,
_ => (),
}
self.stream.push_back(Frame::Message(resp))
} }
/// Write payload /// Write payload
pub fn write<B: Into<Binary>>(&mut self, data: B) { pub fn write<B: Into<Binary>>(&mut self, data: B) {
if self.streaming {
if !self.disconnected {
self.stream.push_back(Frame::Payload(Some(data.into()))) self.stream.push_back(Frame::Payload(Some(data.into())))
} }
} else {
/// Indicate end of streamimng payload. Also this method calls `Self::close`. warn!("Trying to write response body for non-streaming response");
pub fn write_eof(&mut self) { }
self.stop();
} }
/// Returns drain future /// Returns drain future
@ -184,11 +185,15 @@ impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
} }
} }
#[doc(hidden)] impl<A, S> IoContext for HttpContext<A, S> where A: Actor<Context=Self>, S: 'static {
impl<A, S> Stream for HttpContext<A, S> where A: Actor<Context=Self>
{ fn disconnected(&mut self) {
type Item = Frame; self.items.stop();
type Error = Error; self.disconnected = true;
if self.state == ActorState::Running {
self.state = ActorState::Stopping;
}
}
fn poll(&mut self) -> Poll<Option<Frame>, Error> { fn poll(&mut self) -> Poll<Option<Frame>, Error> {
let act: &mut A = unsafe { let act: &mut A = unsafe {

View File

@ -381,28 +381,6 @@ impl PayloadEncoder {
resp.headers.remove(TRANSFER_ENCODING); resp.headers.remove(TRANSFER_ENCODING);
TransferEncoding::length(0) TransferEncoding::length(0)
}, },
Body::Length(n) => {
if resp.chunked() {
error!("Chunked transfer is enabled but body with specific length is specified");
}
if compression {
resp.headers.remove(CONTENT_LENGTH);
if version == Version::HTTP_2 {
resp.headers.remove(TRANSFER_ENCODING);
TransferEncoding::eof()
} else {
resp.headers.insert(
TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
TransferEncoding::chunked()
}
} else {
resp.headers.insert(
CONTENT_LENGTH,
HeaderValue::from_str(format!("{}", n).as_str()).unwrap());
resp.headers.remove(TRANSFER_ENCODING);
TransferEncoding::length(n)
}
},
Body::Binary(ref mut bytes) => { Body::Binary(ref mut bytes) => {
if compression { if compression {
let transfer = TransferEncoding::eof(); let transfer = TransferEncoding::eof();
@ -435,7 +413,7 @@ impl PayloadEncoder {
TransferEncoding::length(bytes.len() as u64) TransferEncoding::length(bytes.len() as u64)
} }
} }
Body::Streaming => { Body::Streaming(_) | Body::StreamingContext => {
if resp.chunked() { if resp.chunked() {
resp.headers.remove(CONTENT_LENGTH); resp.headers.remove(CONTENT_LENGTH);
if version != Version::HTTP_11 { if version != Version::HTTP_11 {
@ -449,11 +427,23 @@ impl PayloadEncoder {
TRANSFER_ENCODING, HeaderValue::from_static("chunked")); TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
TransferEncoding::chunked() TransferEncoding::chunked()
} }
} else if let Some(len) = resp.headers().get(CONTENT_LENGTH) {
// Content-Length
if let Ok(s) = len.to_str() {
if let Ok(len) = s.parse::<u64>() {
TransferEncoding::length(len)
} else {
debug!("illegal Content-Length: {:?}", len);
TransferEncoding::eof()
}
} else {
TransferEncoding::eof()
}
} else { } else {
TransferEncoding::eof() TransferEncoding::eof()
} }
} }
Body::Upgrade => { Body::Upgrade(_) | Body::UpgradeContext => {
if version == Version::HTTP_2 { if version == Version::HTTP_2 {
error!("Connection upgrade is forbidden for HTTP/2"); error!("Connection upgrade is forbidden for HTTP/2");
} else { } else {

View File

@ -11,7 +11,6 @@ use serde::Serialize;
use Cookie; use Cookie;
use body::Body; use body::Body;
use route::Frame;
use error::Error; use error::Error;
use encoding::ContentEncoding; use encoding::ContentEncoding;
@ -223,13 +222,6 @@ impl fmt::Debug for HttpResponse {
} }
} }
// TODO: remove
impl From<HttpResponse> for Frame {
fn from(resp: HttpResponse) -> Frame {
Frame::Message(resp)
}
}
#[derive(Debug)] #[derive(Debug)]
struct Parts { struct Parts {
version: Option<Version>, version: Option<Version>,
@ -535,12 +527,6 @@ mod tests {
assert_eq!(resp.status(), StatusCode::NO_CONTENT) assert_eq!(resp.status(), StatusCode::NO_CONTENT)
} }
#[test]
fn test_body() {
assert!(Body::Length(10).is_streaming());
assert!(Body::Streaming.is_streaming());
}
#[test] #[test]
fn test_upgrade() { fn test_upgrade() {
let resp = HttpResponse::build(StatusCode::OK) let resp = HttpResponse::build(StatusCode::OK)

View File

@ -82,7 +82,7 @@ pub use application::Application;
pub use httprequest::{HttpRequest, UrlEncoded}; pub use httprequest::{HttpRequest, UrlEncoded};
pub use httpresponse::HttpResponse; pub use httpresponse::HttpResponse;
pub use payload::{Payload, PayloadItem}; pub use payload::{Payload, PayloadItem};
pub use route::{Frame, Reply}; pub use route::Reply;
pub use resource::Resource; pub use resource::Resource;
pub use recognizer::Params; pub use recognizer::Params;
pub use server::HttpServer; pub use server::HttpServer;

View File

@ -391,11 +391,7 @@ impl MiddlewaresResponse {
} }
} }
pub fn poll(&mut self, req: &mut HttpRequest) -> Poll<Option<HttpResponse>, Error> { pub fn poll(&mut self, req: &mut HttpRequest) -> Poll<HttpResponse, Error> {
if self.fut.is_none() {
return Ok(Async::Ready(None))
}
loop { loop {
// poll latest fut // poll latest fut
let mut resp = match self.fut.as_mut().unwrap().poll() { let mut resp = match self.fut.as_mut().unwrap().poll() {
@ -410,7 +406,7 @@ impl MiddlewaresResponse {
loop { loop {
if self.idx == 0 { if self.idx == 0 {
return Ok(Async::Ready(Some(resp))) return Ok(Async::Ready(resp))
} else { } else {
match self.middlewares[self.idx].response(req, resp) { match self.middlewares[self.idx].response(req, resp) {
Response::Err(err) => Response::Err(err) =>

View File

@ -2,12 +2,13 @@ use std::marker::PhantomData;
use std::collections::HashMap; use std::collections::HashMap;
use http::Method; use http::Method;
use futures::Stream; use futures::Future;
use task::Task; use task::Task;
use error::Error; use error::Error;
use route::{Reply, RouteHandler, Frame, WrapHandler, Handler, StreamHandler}; use route::{Reply, RouteHandler, WrapHandler, Handler, StreamHandler};
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::HttpResponse;
use httpcodes::{HTTPNotFound, HTTPMethodNotAllowed}; use httpcodes::{HTTPNotFound, HTTPMethodNotAllowed};
/// Http resource /// Http resource
@ -68,7 +69,7 @@ impl<S> Resource<S> where S: 'static {
/// Register async handler for specified method. /// Register async handler for specified method.
pub fn async<F, R>(&mut self, method: Method, handler: F) pub fn async<F, R>(&mut self, method: Method, handler: F)
where F: Fn(HttpRequest<S>) -> R + 'static, where F: Fn(HttpRequest<S>) -> R + 'static,
R: Stream<Item=Frame, Error=Error> + 'static, R: Future<Item=HttpResponse, Error=Error> + 'static,
{ {
self.routes.insert(method, Box::new(StreamHandler::new(handler))); self.routes.insert(method, Box::new(StreamHandler::new(handler)));
} }

View File

@ -1,31 +1,14 @@
use std::rc::Rc;
use std::cell::RefCell;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::result::Result as StdResult; use std::result::Result as StdResult;
use actix::Actor; use actix::Actor;
use futures::Stream; use futures::Future;
use body::Binary;
use error::Error; use error::Error;
use context::HttpContext; use context::HttpContext;
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use task::{Task, DrainFut, IoContext}; use task::{Task, IoContext};
#[doc(hidden)]
#[derive(Debug)]
pub enum Frame {
Message(HttpResponse),
Payload(Option<Binary>),
Drain(Rc<RefCell<DrainFut>>),
}
impl Frame {
pub fn eof() -> Frame {
Frame::Payload(None)
}
}
/// Trait defines object that could be regestered as route handler /// Trait defines object that could be regestered as route handler
#[allow(unused_variables)] #[allow(unused_variables)]
@ -55,8 +38,8 @@ pub struct Reply(ReplyItem);
enum ReplyItem { enum ReplyItem {
Message(HttpResponse), Message(HttpResponse),
Actor(Box<IoContext<Item=Frame, Error=Error>>), Actor(Box<IoContext>),
Stream(Box<Stream<Item=Frame, Error=Error>>), Future(Box<Future<Item=HttpResponse, Error=Error>>),
} }
impl Reply { impl Reply {
@ -69,10 +52,10 @@ impl Reply {
} }
/// Create async response /// Create async response
pub fn stream<S>(stream: S) -> Reply pub fn async<F>(fut: F) -> Reply
where S: Stream<Item=Frame, Error=Error> + 'static where F: Future<Item=HttpResponse, Error=Error> + 'static
{ {
Reply(ReplyItem::Stream(Box::new(stream))) Reply(ReplyItem::Future(Box::new(fut)))
} }
/// Send response /// Send response
@ -89,8 +72,8 @@ impl Reply {
ReplyItem::Actor(ctx) => { ReplyItem::Actor(ctx) => {
task.context(ctx) task.context(ctx)
} }
ReplyItem::Stream(stream) => { ReplyItem::Future(fut) => {
task.stream(stream) task.async(fut)
} }
} }
} }
@ -160,7 +143,7 @@ impl<S, H, R> RouteHandler<S> for WrapHandler<S, H, R>
pub(crate) pub(crate)
struct StreamHandler<S, R, F> struct StreamHandler<S, R, F>
where F: Fn(HttpRequest<S>) -> R + 'static, where F: Fn(HttpRequest<S>) -> R + 'static,
R: Stream<Item=Frame, Error=Error> + 'static, R: Future<Item=HttpResponse, Error=Error> + 'static,
S: 'static, S: 'static,
{ {
f: Box<F>, f: Box<F>,
@ -169,7 +152,7 @@ struct StreamHandler<S, R, F>
impl<S, R, F> StreamHandler<S, R, F> impl<S, R, F> StreamHandler<S, R, F>
where F: Fn(HttpRequest<S>) -> R + 'static, where F: Fn(HttpRequest<S>) -> R + 'static,
R: Stream<Item=Frame, Error=Error> + 'static, R: Future<Item=HttpResponse, Error=Error> + 'static,
S: 'static, S: 'static,
{ {
pub fn new(f: F) -> Self { pub fn new(f: F) -> Self {
@ -179,10 +162,10 @@ impl<S, R, F> StreamHandler<S, R, F>
impl<S, R, F> RouteHandler<S> for StreamHandler<S, R, F> impl<S, R, F> RouteHandler<S> for StreamHandler<S, R, F>
where F: Fn(HttpRequest<S>) -> R + 'static, where F: Fn(HttpRequest<S>) -> R + 'static,
R: Stream<Item=Frame, Error=Error> + 'static, R: Future<Item=HttpResponse, Error=Error> + 'static,
S: 'static, S: 'static,
{ {
fn handle(&self, req: HttpRequest<S>, task: &mut Task) { fn handle(&self, req: HttpRequest<S>, task: &mut Task) {
task.stream((self.f)(req)) task.async((self.f)(req))
} }
} }

View File

@ -1,20 +1,18 @@
use std::mem; use std::{fmt, mem};
use std::rc::Rc; use std::rc::Rc;
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::VecDeque;
use futures::{Async, Future, Poll, Stream}; use futures::{Async, Future, Poll};
use futures::task::{Task as FutureTask, current as current_task}; use futures::task::{Task as FutureTask, current as current_task};
use body::{Body, BodyStream, Binary};
use context::Frame;
use h1writer::{Writer, WriterState}; use h1writer::{Writer, WriterState};
use error::{Error, UnexpectedTaskFrame}; use error::{Error, UnexpectedTaskFrame};
use route::Frame;
use pipeline::MiddlewaresResponse; use pipeline::MiddlewaresResponse;
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
type FrameStream = Stream<Item=Frame, Error=Error>;
#[derive(PartialEq, Debug)] #[derive(PartialEq, Debug)]
enum TaskRunningState { enum TaskRunningState {
Paused, Paused,
@ -38,27 +36,69 @@ impl TaskRunningState {
} }
} }
#[derive(PartialEq, Debug)] enum ResponseState {
enum TaskIOState { Reading,
ReadingMessage, Ready(HttpResponse),
ReadingPayload, Middlewares(MiddlewaresResponse),
Done, Prepared(Option<HttpResponse>),
} }
impl TaskIOState { enum IOState {
fn is_done(&self) -> bool { Response,
*self == TaskIOState::Done Payload(BodyStream),
} Context,
Done,
} }
enum TaskStream { enum TaskStream {
None, None,
Stream(Box<FrameStream>), Context(Box<IoContext>),
Context(Box<IoContext<Item=Frame, Error=Error>>), Response(Box<Future<Item=HttpResponse, Error=Error>>),
} }
pub(crate) trait IoContext: Stream<Item=Frame, Error=Error> + 'static { impl IOState {
fn is_done(&self) -> bool {
match *self {
IOState::Done => true,
_ => false
}
}
}
impl ResponseState {
fn is_reading(&self) -> bool {
match *self {
ResponseState::Reading => true,
_ => false
}
}
}
impl fmt::Debug for ResponseState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
ResponseState::Reading => write!(f, "ResponseState::Reading"),
ResponseState::Ready(_) => write!(f, "ResponseState::Ready"),
ResponseState::Middlewares(_) => write!(f, "ResponseState::Middlewares"),
ResponseState::Prepared(_) => write!(f, "ResponseState::Prepared"),
}
}
}
impl fmt::Debug for IOState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
IOState::Response => write!(f, "IOState::Response"),
IOState::Payload(_) => write!(f, "IOState::Payload"),
IOState::Context => write!(f, "IOState::Context"),
IOState::Done => write!(f, "IOState::Done"),
}
}
}
pub(crate) trait IoContext: 'static {
fn disconnected(&mut self); fn disconnected(&mut self);
fn poll(&mut self) -> Poll<Option<Frame>, Error>;
} }
/// Future that resolves when all buffered data get sent /// Future that resolves when all buffered data get sent
@ -104,12 +144,11 @@ impl Future for DrainFut {
} }
pub struct Task { pub struct Task {
state: TaskRunningState, running: TaskRunningState,
iostate: TaskIOState, response: ResponseState,
frames: VecDeque<Frame>, iostate: IOState,
stream: TaskStream, stream: TaskStream,
drain: Vec<Rc<RefCell<DrainFut>>>, drain: Vec<Rc<RefCell<DrainFut>>>,
prepared: Option<HttpResponse>,
disconnected: bool, disconnected: bool,
middlewares: Option<MiddlewaresResponse>, middlewares: Option<MiddlewaresResponse>,
} }
@ -118,12 +157,11 @@ pub struct Task {
impl Default for Task { impl Default for Task {
fn default() -> Task { fn default() -> Task {
Task { state: TaskRunningState::Running, Task { running: TaskRunningState::Running,
iostate: TaskIOState::ReadingMessage, response: ResponseState::Reading,
frames: VecDeque::new(), iostate: IOState::Response,
drain: Vec::new(), drain: Vec::new(),
stream: TaskStream::None, stream: TaskStream::None,
prepared: None,
disconnected: false, disconnected: false,
middlewares: None } middlewares: None }
} }
@ -132,16 +170,11 @@ impl Default for Task {
impl Task { impl Task {
pub(crate) fn from_response<R: Into<HttpResponse>>(response: R) -> Task { pub(crate) fn from_response<R: Into<HttpResponse>>(response: R) -> Task {
let mut frames = VecDeque::new(); Task { running: TaskRunningState::Running,
frames.push_back(Frame::Message(response.into())); response: ResponseState::Ready(response.into()),
frames.push_back(Frame::Payload(None)); iostate: IOState::Response,
Task { state: TaskRunningState::Running,
iostate: TaskIOState::Done,
frames: frames,
drain: Vec::new(), drain: Vec::new(),
stream: TaskStream::None, stream: TaskStream::None,
prepared: None,
disconnected: false, disconnected: false,
middlewares: None } middlewares: None }
} }
@ -151,27 +184,33 @@ impl Task {
} }
pub fn reply<R: Into<HttpResponse>>(&mut self, response: R) { pub fn reply<R: Into<HttpResponse>>(&mut self, response: R) {
self.frames.push_back(Frame::Message(response.into())); let state = &mut self.response;
self.frames.push_back(Frame::Payload(None)); match *state {
self.iostate = TaskIOState::Done; ResponseState::Reading =>
*state = ResponseState::Ready(response.into()),
_ => panic!("Internal task state is broken"),
}
} }
pub fn error<E: Into<Error>>(&mut self, err: E) { pub fn error<E: Into<Error>>(&mut self, err: E) {
self.reply(err.into()) self.reply(err.into())
} }
pub(crate) fn context(&mut self, ctx: Box<IoContext<Item=Frame, Error=Error>>) { pub(crate) fn context(&mut self, ctx: Box<IoContext>) {
self.stream = TaskStream::Context(ctx); self.stream = TaskStream::Context(ctx);
} }
pub fn stream<S>(&mut self, stream: S) pub fn async<F>(&mut self, fut: F)
where S: Stream<Item=Frame, Error=Error> + 'static where F: Future<Item=HttpResponse, Error=Error> + 'static
{ {
self.stream = TaskStream::Stream(Box::new(stream)); self.stream = TaskStream::Response(Box::new(fut));
} }
pub(crate) fn response(&mut self) -> HttpResponse { pub(crate) fn response(&mut self) -> HttpResponse {
self.prepared.take().unwrap() match self.response {
ResponseState::Prepared(ref mut state) => state.take().unwrap(),
_ => panic!("Internal state is broken"),
}
} }
pub(crate) fn set_middlewares(&mut self, middlewares: MiddlewaresResponse) { pub(crate) fn set_middlewares(&mut self, middlewares: MiddlewaresResponse) {
@ -188,97 +227,112 @@ impl Task {
pub(crate) fn poll_io<T>(&mut self, io: &mut T, req: &mut HttpRequest) -> Poll<bool, Error> pub(crate) fn poll_io<T>(&mut self, io: &mut T, req: &mut HttpRequest) -> Poll<bool, Error>
where T: Writer where T: Writer
{ {
trace!("POLL-IO frames:{:?}", self.frames.len()); trace!("POLL-IO frames resp: {:?}, io: {:?}, running: {:?}",
self.response, self.iostate, self.running);
// response is completed if self.iostate.is_done() { // response is completed
if self.frames.is_empty() && self.iostate.is_done() { return Ok(Async::Ready(self.running.is_done()));
return Ok(Async::Ready(self.state.is_done())); } else if self.drain.is_empty() && self.running != TaskRunningState::Paused {
} else if self.drain.is_empty() { // if task is paused, write buffer is probably full
// poll stream
if self.state == TaskRunningState::Running { loop {
let result = match mem::replace(&mut self.iostate, IOState::Done) {
IOState::Response => {
match self.poll_response(req) {
Ok(Async::Ready(mut resp)) => {
let result = io.start(req, &mut resp)?;
match resp.replace_body(Body::Empty) {
Body::Streaming(stream) | Body::Upgrade(stream) =>
self.iostate = IOState::Payload(stream),
Body::StreamingContext | Body::UpgradeContext =>
self.iostate = IOState::Context,
_ => (),
}
self.response = ResponseState::Prepared(Some(resp));
result
},
Ok(Async::NotReady) => {
self.iostate = IOState::Response;
return Ok(Async::NotReady)
}
Err(err) => {
let mut resp = err.into();
let result = io.start(req, &mut resp)?;
match resp.replace_body(Body::Empty) {
Body::Streaming(stream) | Body::Upgrade(stream) =>
self.iostate = IOState::Payload(stream),
_ => (),
}
self.response = ResponseState::Prepared(Some(resp));
result
}
}
},
IOState::Payload(mut body) => {
// always poll stream
if self.running == TaskRunningState::Running {
match self.poll()? { match self.poll()? {
Async::Ready(_) => Async::Ready(_) =>
self.state = TaskRunningState::Done, self.running = TaskRunningState::Done,
Async::NotReady => (), Async::NotReady => (),
} }
} }
// process middlewares response match body.poll() {
if let Some(mut middlewares) = self.middlewares.take() { Ok(Async::Ready(None)) => {
match middlewares.poll(req)? { self.iostate = IOState::Done;
Async::NotReady => { io.write_eof()?;
self.middlewares = Some(middlewares); break
return Ok(Async::NotReady);
}
Async::Ready(None) => {
self.middlewares = Some(middlewares);
}
Async::Ready(Some(mut response)) => {
let result = io.start(req, &mut response)?;
self.prepared = Some(response);
match result {
WriterState::Pause => self.state.pause(),
WriterState::Done => self.state.resume(),
}
}, },
Ok(Async::Ready(Some(chunk))) => {
self.iostate = IOState::Payload(body);
io.write(chunk.as_ref())?
}
Ok(Async::NotReady) => {
self.iostate = IOState::Payload(body);
break
},
Err(err) => return Err(err),
} }
} }
IOState::Context => {
// if task is paused, write buffer is probably full match self.poll_context() {
if self.state != TaskRunningState::Paused { Ok(Async::Ready(None)) => {
// process exiting frames self.iostate = IOState::Done;
while let Some(frame) = self.frames.pop_front() { self.running = TaskRunningState::Done;
trace!("IO Frame: {:?}", frame); io.write_eof()?;
let res = match frame { break
Frame::Message(mut resp) => { },
// run middlewares Ok(Async::Ready(Some(chunk))) => {
if let Some(mut middlewares) = self.middlewares.take() { self.iostate = IOState::Context;
match middlewares.response(req, resp) { io.write(chunk.as_ref())?
Ok(Some(mut resp)) => {
let result = io.start(req, &mut resp)?;
self.prepared = Some(resp);
result
} }
Ok(None) => { Ok(Async::NotReady) => {
// middlewares need to run some futures self.iostate = IOState::Context;
self.middlewares = Some(middlewares); break
return self.poll_io(io, req)
} }
Err(err) => return Err(err), Err(err) => return Err(err),
} }
} else {
let result = io.start(req, &mut resp)?;
self.prepared = Some(resp);
result
}
}
Frame::Payload(Some(chunk)) => {
io.write(chunk.as_ref())?
},
Frame::Payload(None) => {
self.iostate = TaskIOState::Done;
io.write_eof()?
},
Frame::Drain(fut) => {
self.drain.push(fut);
break
} }
IOState::Done => break,
}; };
match res { match result {
WriterState::Pause => { WriterState::Pause => {
self.state.pause(); self.running.pause();
break break
} }
WriterState::Done => self.state.resume(), WriterState::Done =>
} self.running.resume(),
} }
} }
} }
// flush io // flush io
match io.poll_complete() { match io.poll_complete() {
Ok(Async::Ready(_)) => self.state.resume(), Ok(Async::Ready(_)) => self.running.resume(),
Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => { Err(err) => {
debug!("Error sending data: {}", err); debug!("Error sending data: {}", err);
@ -296,46 +350,116 @@ impl Task {
// response is completed // response is completed
if self.iostate.is_done() { if self.iostate.is_done() {
if let Some(ref mut resp) = self.prepared { if let ResponseState::Prepared(Some(ref mut resp)) = self.response {
resp.set_response_size(io.written()); resp.set_response_size(io.written())
} }
Ok(Async::Ready(self.state.is_done())) Ok(Async::Ready(self.running.is_done()))
} else { } else {
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }
fn poll_stream<S>(&mut self, stream: &mut S) -> Poll<(), Error> pub(crate) fn poll_response(&mut self, req: &mut HttpRequest) -> Poll<HttpResponse, Error> {
where S: Stream<Item=Frame, Error=Error>
{
loop { loop {
match stream.poll() { let state = mem::replace(&mut self.response, ResponseState::Prepared(None));
Ok(Async::Ready(Some(frame))) => { match state {
match frame { ResponseState::Ready(response) => {
Frame::Message(ref msg) => { // run middlewares
if self.iostate != TaskIOState::ReadingMessage { if let Some(mut middlewares) = self.middlewares.take() {
error!("Unexpected frame {:?}", frame); match middlewares.response(req, response) {
return Err(UnexpectedTaskFrame.into()) Ok(Some(response)) =>
return Ok(Async::Ready(response)),
Ok(None) => {
// middlewares need to run some futures
self.response = ResponseState::Middlewares(middlewares);
continue
}
Err(err) => return Err(err),
} }
let upgrade = msg.upgrade();
if upgrade || msg.body().is_streaming() {
self.iostate = TaskIOState::ReadingPayload;
} else { } else {
self.iostate = TaskIOState::Done; return Ok(Async::Ready(response))
} }
},
Frame::Payload(ref chunk) => {
if chunk.is_none() {
self.iostate = TaskIOState::Done;
} else if self.iostate != TaskIOState::ReadingPayload {
error!("Unexpected frame {:?}", self.iostate);
return Err(UnexpectedTaskFrame.into())
} }
ResponseState::Middlewares(mut middlewares) => {
// process middlewares
match middlewares.poll(req) {
Ok(Async::NotReady) => {
self.response = ResponseState::Middlewares(middlewares);
return Ok(Async::NotReady)
}, },
Ok(Async::Ready(response)) =>
return Ok(Async::Ready(response)),
Err(err) =>
return Err(err),
}
}
_ => (), _ => (),
} }
self.frames.push_back(frame) self.response = state;
match mem::replace(&mut self.stream, TaskStream::None) {
TaskStream::None =>
return Ok(Async::NotReady),
TaskStream::Context(mut context) => {
loop {
match context.poll() {
Ok(Async::Ready(Some(frame))) => {
match frame {
Frame::Message(msg) => {
if !self.response.is_reading() {
error!("Unexpected message frame {:?}", msg);
return Err(UnexpectedTaskFrame.into())
}
self.stream = TaskStream::Context(context);
self.response = ResponseState::Ready(msg);
break
}, },
Frame::Payload(_) => (),
Frame::Drain(fut) => {
self.drain.push(fut);
self.stream = TaskStream::Context(context);
break
}
}
},
Ok(Async::Ready(None)) => {
error!("Unexpected eof");
return Err(UnexpectedTaskFrame.into())
},
Ok(Async::NotReady) => {
self.stream = TaskStream::Context(context);
return Ok(Async::NotReady)
},
Err(err) =>
return Err(err),
}
}
},
TaskStream::Response(mut fut) => {
match fut.poll() {
Ok(Async::NotReady) => {
self.stream = TaskStream::Response(fut);
return Ok(Async::NotReady);
},
Ok(Async::Ready(response)) => {
self.response = ResponseState::Ready(response);
}
Err(err) =>
return Err(err)
}
}
}
}
}
pub(crate) fn poll(&mut self) -> Poll<(), Error> {
match self.stream {
TaskStream::None | TaskStream::Response(_) =>
Ok(Async::Ready(())),
TaskStream::Context(ref mut context) => {
loop {
match context.poll() {
Ok(Async::Ready(Some(_))) => (),
Ok(Async::Ready(None)) => Ok(Async::Ready(None)) =>
return Ok(Async::Ready(())), return Ok(Async::Ready(())),
Ok(Async::NotReady) => Ok(Async::NotReady) =>
@ -344,17 +468,36 @@ impl Task {
return Err(err), return Err(err),
} }
} }
},
}
} }
pub(crate) fn poll(&mut self) -> Poll<(), Error> { fn poll_context(&mut self) -> Poll<Option<Binary>, Error> {
let mut s = mem::replace(&mut self.stream, TaskStream::None); match self.stream {
TaskStream::None | TaskStream::Response(_) =>
let result = match s { Err(UnexpectedTaskFrame.into()),
TaskStream::None => Ok(Async::Ready(())), TaskStream::Context(ref mut context) => {
TaskStream::Stream(ref mut stream) => self.poll_stream(stream), match context.poll() {
TaskStream::Context(ref mut context) => self.poll_stream(context), Ok(Async::Ready(Some(frame))) => {
}; match frame {
self.stream = s; Frame::Message(msg) => {
result error!("Unexpected message frame {:?}", msg);
Err(UnexpectedTaskFrame.into())
},
Frame::Payload(payload) => {
Ok(Async::Ready(payload))
},
Frame::Drain(fut) => {
self.drain.push(fut);
Ok(Async::NotReady)
}
}
},
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Err(err),
}
},
}
} }
} }

View File

@ -169,7 +169,7 @@ pub fn handshake<S>(req: &HttpRequest<S>) -> Result<HttpResponse, WsHandshakeErr
.header(header::UPGRADE, "websocket") .header(header::UPGRADE, "websocket")
.header(header::TRANSFER_ENCODING, "chunked") .header(header::TRANSFER_ENCODING, "chunked")
.header(SEC_WEBSOCKET_ACCEPT, key.as_str()) .header(SEC_WEBSOCKET_ACCEPT, key.as_str())
.body(Body::Upgrade).unwrap() .body(Body::UpgradeContext).unwrap()
) )
} }