From cc38b30f7be041b5406a86cd169395f9a27ca15b Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 31 Dec 2017 17:26:32 -0800 Subject: [PATCH] refactor http actor usage --- examples/websocket.rs | 4 +- guide/src/qs_10.md | 2 +- guide/src/qs_14.md | 2 +- guide/src/qs_4.md | 6 +-- guide/src/qs_7.md | 2 +- guide/src/qs_8.md | 8 +-- src/application.rs | 2 +- src/body.rs | 32 +++++------- src/context.rs | 105 +++++++++++++++++++++++++++------------ src/encoding.rs | 29 ++++++----- src/error.rs | 8 --- src/handler.rs | 32 +----------- src/httpcodes.rs | 9 ++-- src/httprequest.rs | 12 +++-- src/httpresponse.rs | 13 +++++ src/json.rs | 2 +- src/middleware/logger.rs | 6 +-- src/payload.rs | 42 ++++++---------- src/pipeline.rs | 97 ++++++++---------------------------- src/router.rs | 5 +- src/test.rs | 7 ++- src/ws.rs | 26 +++++----- 22 files changed, 197 insertions(+), 254 deletions(-) diff --git a/examples/websocket.rs b/examples/websocket.rs index 2a80add1b..3ddd04f58 100644 --- a/examples/websocket.rs +++ b/examples/websocket.rs @@ -13,8 +13,8 @@ use actix_web::*; /// do websocket handshake and start `MyWebSocket` actor -fn ws_index(r: HttpRequest) -> Reply { - ws::start(r, MyWebSocket).into() +fn ws_index(r: HttpRequest) -> Result { + ws::start(r, MyWebSocket) } /// websocket connection is long running connection, it easier diff --git a/guide/src/qs_10.md b/guide/src/qs_10.md index 89bf8c6d3..8974f241d 100644 --- a/guide/src/qs_10.md +++ b/guide/src/qs_10.md @@ -72,7 +72,7 @@ Create `Logger` middleware with the specified `format`. Default `Logger` could be created with `default` method, it uses the default format: ```ignore - %a %t "%r" %s %b "%{Referrer}i" "%{User-Agent}i" %T + %a %t "%r" %s %b "%{Referer}i" "%{User-Agent}i" %T ``` ```rust # extern crate actix_web; diff --git a/guide/src/qs_14.md b/guide/src/qs_14.md index e9e489be7..26fa4ecfb 100644 --- a/guide/src/qs_14.md +++ b/guide/src/qs_14.md @@ -110,7 +110,7 @@ fn index(req: HttpRequest) -> Box> .and_then(|res| { match res { Ok(user) => Ok(httpcodes::HTTPOk.build().json(user)?), - Err(_) => Ok(httpcodes::HTTPInternalServerError.response()) + Err(_) => Ok(httpcodes::HTTPInternalServerError.into()) } }) .responder() diff --git a/guide/src/qs_4.md b/guide/src/qs_4.md index 1a82d51bd..42afb9219 100644 --- a/guide/src/qs_4.md +++ b/guide/src/qs_4.md @@ -65,7 +65,7 @@ impl Handler for MyHandler { /// Handle request fn handle(&mut self, req: HttpRequest) -> Self::Result { self.0 += 1; - httpcodes::HTTPOk.response() + httpcodes::HTTPOk.into() } } # fn main() {} @@ -91,7 +91,7 @@ impl Handler for MyHandler { fn handle(&mut self, req: HttpRequest) -> Self::Result { let num = self.0.load(Ordering::Relaxed) + 1; self.0.store(num, Ordering::Relaxed); - httpcodes::HTTPOk.response() + httpcodes::HTTPOk.into() } } @@ -104,7 +104,7 @@ fn main() { move || { let cloned = inc.clone(); Application::new() - .resource("/", move |r| r.h(MyHandler(cloned))) + .resource("/", move |r| r.h(MyHandler(cloned))) }) .bind("127.0.0.1:8088").unwrap() .start(); diff --git a/guide/src/qs_7.md b/guide/src/qs_7.md index 5d3447514..7cce5932b 100644 --- a/guide/src/qs_7.md +++ b/guide/src/qs_7.md @@ -246,7 +246,7 @@ fn index(mut req: HttpRequest) -> Box> { .from_err() .and_then(|params| { // <- url encoded parameters println!("==== BODY ==== {:?}", params); - ok(httpcodes::HTTPOk.response()) + ok(httpcodes::HTTPOk.into()) }) .responder() } diff --git a/guide/src/qs_8.md b/guide/src/qs_8.md index 7661b9ad4..53713a205 100644 --- a/guide/src/qs_8.md +++ b/guide/src/qs_8.md @@ -20,10 +20,10 @@ use actix_web::test::TestRequest; fn index(req: HttpRequest) -> HttpResponse { if let Some(hdr) = req.headers().get(header::CONTENT_TYPE) { if let Ok(s) = hdr.to_str() { - return httpcodes::HTTPOk.response() + return httpcodes::HTTPOk.into() } } - httpcodes::HTTPBadRequest.response() + httpcodes::HTTPBadRequest.into() } fn main() { @@ -60,7 +60,7 @@ use actix_web::*; use actix_web::test::TestServer; fn index(req: HttpRequest) -> HttpResponse { - httpcodes::HTTPOk.response() + httpcodes::HTTPOk.into() } fn main() { @@ -80,7 +80,7 @@ use actix_web::*; use actix_web::test::TestServer; fn index(req: HttpRequest) -> HttpResponse { - httpcodes::HTTPOk.response() + httpcodes::HTTPOk.into() } /// This function get called by http server. diff --git a/src/application.rs b/src/application.rs index 0fb44bedd..02ae078ba 100644 --- a/src/application.rs +++ b/src/application.rs @@ -128,7 +128,7 @@ impl Application where S: 'static { } } - /// Set application prefix. + /// Set application prefix /// /// Only requests that matches application's prefix get processed by this application. /// Application prefix always contains laading "/" slash. If supplied prefix diff --git a/src/body.rs b/src/body.rs index b9e6676e7..53af6e40e 100644 --- a/src/body.rs +++ b/src/body.rs @@ -5,6 +5,7 @@ use bytes::{Bytes, BytesMut}; use futures::Stream; use error::Error; +use context::ActorHttpContext; /// Type represent streaming body pub type BodyStream = Box>; @@ -18,12 +19,8 @@ pub enum Body { /// Unspecified streaming response. Developer is responsible for setting /// right `Content-Length` or `Transfer-Encoding` headers. Streaming(BodyStream), - /// Upgrade connection. - Upgrade(BodyStream), - /// Special body type for actor streaming response. - StreamingContext, - /// Special body type for actor upgrade response. - UpgradeContext, + /// Special body type for actor response. + Actor(Box), } /// Represents various types of binary body. @@ -51,8 +48,7 @@ impl Body { #[inline] pub fn is_streaming(&self) -> bool { match *self { - Body::Streaming(_) | Body::StreamingContext - | Body::Upgrade(_) | Body::UpgradeContext => true, + Body::Streaming(_) | Body::Actor(_) => true, _ => false } } @@ -83,15 +79,7 @@ impl PartialEq for Body { Body::Binary(ref b2) => b == b2, _ => false, }, - Body::StreamingContext => match *other { - Body::StreamingContext => true, - _ => false, - }, - Body::UpgradeContext => match *other { - Body::UpgradeContext => true, - _ => false, - }, - Body::Streaming(_) | Body::Upgrade(_) => false, + Body::Streaming(_) | Body::Actor(_) => false, } } } @@ -102,9 +90,7 @@ impl fmt::Debug for Body { Body::Empty => write!(f, "Body::Empty"), Body::Binary(ref b) => write!(f, "Body::Binary({:?})", b), Body::Streaming(_) => write!(f, "Body::Streaming(_)"), - Body::Upgrade(_) => write!(f, "Body::Upgrade(_)"), - Body::StreamingContext => write!(f, "Body::StreamingContext"), - Body::UpgradeContext => write!(f, "Body::UpgradeContext"), + Body::Actor(_) => write!(f, "Body::Actor(_)"), } } } @@ -115,6 +101,12 @@ impl From for Body where T: Into{ } } +impl From> for Body { + fn from(ctx: Box) -> Body { + Body::Actor(ctx) + } +} + impl Binary { #[inline] pub fn is_empty(&self) -> bool { diff --git a/src/context.rs b/src/context.rs index f0e80de1a..1cdb6b9b4 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,6 +1,7 @@ use std; +use std::marker::PhantomData; use std::collections::VecDeque; -use futures::{Async, Poll}; +use futures::{Async, Future, Poll}; use futures::sync::oneshot::Sender; use futures::unsync::oneshot; @@ -11,19 +12,18 @@ use actix::dev::{AsyncContextApi, ActorAddressCell, ActorItemsCell, ActorWaitCel Envelope, ToEnvelope, RemoteEnvelope}; use body::{Body, Binary}; -use error::Error; +use error::{Error, Result}; use httprequest::HttpRequest; use httpresponse::HttpResponse; -pub(crate) trait IoContext: 'static { +pub trait ActorHttpContext: 'static { fn disconnected(&mut self); fn poll(&mut self) -> Poll, Error>; } #[derive(Debug)] -pub(crate) enum Frame { - Message(HttpResponse), +pub enum Frame { Payload(Option), Drain(oneshot::Sender<()>), } @@ -31,7 +31,7 @@ pub(crate) enum Frame { /// Http actor execution context pub struct HttpContext where A: Actor>, { - act: A, + act: Option, state: ActorState, modified: bool, items: ActorItemsCell, @@ -39,7 +39,6 @@ pub struct HttpContext where A: Actor>, stream: VecDeque, wait: ActorWaitCell, request: HttpRequest, - streaming: bool, disconnected: bool, } @@ -101,12 +100,15 @@ impl AsyncContextApi for HttpContext where A: Actor } } -impl HttpContext where A: Actor { +impl HttpContext where A: Actor { - pub fn new(req: HttpRequest, actor: A) -> HttpContext - { + pub fn new(req: HttpRequest, actor: A) -> HttpContext { + HttpContext::from_request(req).actor(actor) + } + + pub fn from_request(req: HttpRequest) -> HttpContext { HttpContext { - act: actor, + act: None, state: ActorState::Started, modified: false, items: ActorItemsCell::default(), @@ -114,10 +116,24 @@ impl HttpContext where A: Actor { wait: ActorWaitCell::default(), stream: VecDeque::new(), request: req, - streaming: false, disconnected: false, } } + + pub fn actor(mut self, actor: A) -> HttpContext { + self.act = Some(actor); + self + } + + pub fn with_actor(mut self, actor: A, mut resp: HttpResponse) -> Result { + if self.act.is_some() { + panic!("Actor is set already"); + } + self.act = Some(actor); + + resp.replace_body(Body::Actor(Box::new(self))); + Ok(resp) + } } impl HttpContext where A: Actor { @@ -132,24 +148,12 @@ impl HttpContext where A: Actor { &mut self.request } - /// Send response to peer - pub fn start>(&mut self, response: R) { - let resp = response.into(); - match *resp.body() { - Body::StreamingContext | Body::UpgradeContext => self.streaming = true, - _ => (), - } - self.stream.push_back(Frame::Message(resp)) - } - /// Write payload pub fn write>(&mut self, data: B) { - if self.streaming { - if !self.disconnected { - self.stream.push_back(Frame::Payload(Some(data.into()))) - } + if !self.disconnected { + self.stream.push_back(Frame::Payload(Some(data.into()))); } else { - warn!("Trying to write response body for non-streaming response"); + warn!("Trying to write to disconnected response"); } } @@ -159,11 +163,11 @@ impl HttpContext where A: Actor { } /// Returns drain future - pub fn drain(&mut self) -> oneshot::Receiver<()> { + pub fn drain(&mut self) -> Drain { let (tx, rx) = oneshot::channel(); self.modified = true; self.stream.push_back(Frame::Drain(tx)); - rx + Drain::new(rx) } /// Check if connection still open @@ -193,7 +197,7 @@ impl HttpContext where A: Actor { } } -impl IoContext for HttpContext where A: Actor, S: 'static { +impl ActorHttpContext for HttpContext where A: Actor, S: 'static { fn disconnected(&mut self) { self.items.stop(); @@ -204,8 +208,11 @@ impl IoContext for HttpContext where A: Actor, S: 'sta } fn poll(&mut self) -> Poll, Error> { + if self.act.is_none() { + return Ok(Async::Ready(None)) + } let act: &mut A = unsafe { - std::mem::transmute(&mut self.act as &mut A) + std::mem::transmute(self.act.as_mut().unwrap() as &mut A) }; let ctx: &mut HttpContext = unsafe { std::mem::transmute(self as &mut HttpContext) @@ -303,3 +310,39 @@ impl ToEnvelope for HttpContext RemoteEnvelope::new(msg, tx).into() } } + +impl From> for Body + where A: Actor>, + S: 'static +{ + fn from(ctx: HttpContext) -> Body { + Body::Actor(Box::new(ctx)) + } +} + +pub struct Drain { + fut: oneshot::Receiver<()>, + _a: PhantomData, +} + +impl Drain { + fn new(fut: oneshot::Receiver<()>) -> Self { + Drain { + fut: fut, + _a: PhantomData + } + } +} + +impl ActorFuture for Drain { + type Item = (); + type Error = (); + type Actor = A; + + fn poll(&mut self, + _: &mut A, + _: &mut ::Context) -> Poll + { + self.fut.poll().map_err(|_| ()) + } +} diff --git a/src/encoding.rs b/src/encoding.rs index 3254f6404..e30ba9f40 100644 --- a/src/encoding.rs +++ b/src/encoding.rs @@ -416,8 +416,20 @@ impl PayloadEncoder { resp.headers_mut().remove(CONTENT_LENGTH); TransferEncoding::eof(buf) } - Body::Streaming(_) | Body::StreamingContext => { - if resp.chunked() { + Body::Streaming(_) | Body::Actor(_) => { + if resp.upgrade() { + if version == Version::HTTP_2 { + error!("Connection upgrade is forbidden for HTTP/2"); + } else { + resp.headers_mut().insert( + CONNECTION, HeaderValue::from_static("upgrade")); + } + if encoding != ContentEncoding::Identity { + encoding = ContentEncoding::Identity; + resp.headers_mut().remove(CONTENT_ENCODING); + } + TransferEncoding::eof(buf) + } else if resp.chunked() { resp.headers_mut().remove(CONTENT_LENGTH); if version != Version::HTTP_11 { error!("Chunked transfer encoding is forbidden for {:?}", version); @@ -446,19 +458,6 @@ impl PayloadEncoder { TransferEncoding::eof(buf) } } - Body::Upgrade(_) | Body::UpgradeContext => { - if version == Version::HTTP_2 { - error!("Connection upgrade is forbidden for HTTP/2"); - } else { - resp.headers_mut().insert( - CONNECTION, HeaderValue::from_static("upgrade")); - } - if encoding != ContentEncoding::Identity { - encoding = ContentEncoding::Identity; - resp.headers_mut().remove(CONTENT_ENCODING); - } - TransferEncoding::eof(buf) - } }; resp.replace_body(body); diff --git a/src/error.rs b/src/error.rs index ef4b65c56..31ae69fbb 100644 --- a/src/error.rs +++ b/src/error.rs @@ -114,14 +114,6 @@ impl ResponseError for header::InvalidHeaderValue {} /// `InternalServerError` for `futures::Canceled` impl ResponseError for Canceled {} -/// Internal error -#[doc(hidden)] -#[derive(Fail, Debug)] -#[fail(display="Unexpected task frame")] -pub struct UnexpectedTaskFrame; - -impl ResponseError for UnexpectedTaskFrame {} - /// A set of errors that can occur during parsing HTTP streams #[derive(Fail, Debug)] pub enum ParseError { diff --git a/src/handler.rs b/src/handler.rs index deab468ea..106fecc8e 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -1,13 +1,11 @@ use std::marker::PhantomData; -use actix::Actor; -use futures::future::{Future, ok, err}; use regex::Regex; +use futures::future::{Future, ok, err}; use http::{header, StatusCode, Error as HttpError}; use body::Body; use error::Error; -use context::{HttpContext, IoContext}; use httprequest::HttpRequest; use httpresponse::HttpResponse; @@ -69,20 +67,11 @@ pub struct Reply(ReplyItem); pub(crate) enum ReplyItem { Message(HttpResponse), - Actor(Box), Future(Box>), } impl Reply { - /// Create actor response - #[inline] - pub fn actor(ctx: HttpContext) -> Reply - where A: Actor>, S: 'static - { - Reply(ReplyItem::Actor(Box::new(ctx))) - } - /// Create async response #[inline] pub fn async(fut: F) -> Reply @@ -163,25 +152,6 @@ impl> From> for Reply { } } -impl>, S: 'static> Responder for HttpContext -{ - type Item = Reply; - type Error = Error; - - #[inline] - fn respond_to(self, _: HttpRequest) -> Result { - Ok(Reply(ReplyItem::Actor(Box::new(self)))) - } -} - -impl>, S: 'static> From> for Reply { - - #[inline] - fn from(ctx: HttpContext) -> Reply { - Reply(ReplyItem::Actor(Box::new(ctx))) - } -} - impl Responder for Box> where I: Responder + 'static, E: Into + 'static diff --git a/src/httpcodes.rs b/src/httpcodes.rs index abe226cbe..e27c56237 100644 --- a/src/httpcodes.rs +++ b/src/httpcodes.rs @@ -54,9 +54,6 @@ impl StaticResponse { pub fn build(&self) -> HttpResponseBuilder { HttpResponse::build(self.0) } - pub fn response(&self) -> HttpResponse { - HttpResponse::new(self.0, Body::Empty) - } pub fn with_reason(self, reason: &'static str) -> HttpResponse { let mut resp = HttpResponse::new(self.0, Body::Empty); resp.set_reason(reason); @@ -92,7 +89,7 @@ impl Responder for StaticResponse { impl From for HttpResponse { fn from(st: StaticResponse) -> Self { - st.response() + HttpResponse::new(st.0, Body::Empty) } } @@ -153,7 +150,7 @@ mod tests { #[test] fn test_response() { - let resp = HTTPOk.response(); + let resp: HttpResponse = HTTPOk.into(); assert_eq!(resp.status(), StatusCode::OK); } @@ -165,7 +162,7 @@ mod tests { #[test] fn test_with_reason() { - let resp = HTTPOk.response(); + let resp: HttpResponse = HTTPOk.into(); assert_eq!(resp.reason(), "OK"); let resp = HTTPBadRequest.with_reason("test"); diff --git a/src/httprequest.rs b/src/httprequest.rs index 96c36dbb3..dd8de37dc 100644 --- a/src/httprequest.rs +++ b/src/httprequest.rs @@ -135,6 +135,12 @@ impl HttpRequest<()> { impl HttpRequest { + #[inline] + /// Construct new http request with state. + pub fn change_state(self, state: Rc) -> HttpRequest { + HttpRequest(self.0, Some(state), self.2.clone()) + } + #[inline] /// Construct new http request without state. pub(crate) fn clone_without_state(&self) -> HttpRequest { @@ -447,7 +453,7 @@ impl HttpRequest { /// } /// }) /// .finish() // <- Stream::finish() combinator from actix - /// .map(|_| httpcodes::HTTPOk.response()) + /// .map(|_| httpcodes::HTTPOk.into()) /// .responder() /// } /// # fn main() {} @@ -477,7 +483,7 @@ impl HttpRequest { /// .from_err() /// .and_then(|params| { // <- url encoded parameters /// println!("==== BODY ==== {:?}", params); - /// ok(httpcodes::HTTPOk.response()) + /// ok(httpcodes::HTTPOk.into()) /// }) /// .responder() /// } @@ -512,7 +518,7 @@ impl HttpRequest { /// .from_err() /// .and_then(|val: MyObj| { // <- deserialized value /// println!("==== BODY ==== {:?}", val); - /// Ok(httpcodes::HTTPOk.response()) + /// Ok(httpcodes::HTTPOk.into()) /// }).responder() /// } /// # fn main() {} diff --git a/src/httpresponse.rs b/src/httpresponse.rs index 69bb71a72..5d5e85fcb 100644 --- a/src/httpresponse.rs +++ b/src/httpresponse.rs @@ -138,6 +138,7 @@ impl HttpResponse { } /// Connection upgrade status + #[inline] pub fn upgrade(&self) -> bool { self.get_ref().connection_type == Some(ConnectionType::Upgrade) } @@ -155,11 +156,13 @@ impl HttpResponse { } /// is chunked encoding enabled + #[inline] pub fn chunked(&self) -> bool { self.get_ref().chunked } /// Content encoding + #[inline] pub fn content_encoding(&self) -> &ContentEncoding { &self.get_ref().encoding } @@ -171,6 +174,7 @@ impl HttpResponse { } /// Get body os this response + #[inline] pub fn body(&self) -> &Body { &self.get_ref().body } @@ -443,6 +447,15 @@ impl HttpResponseBuilder { pub fn finish(&mut self) -> Result { self.body(Body::Empty) } + + /// This method construct new `HttpResponseBuilder` + pub fn take(&mut self) -> HttpResponseBuilder { + HttpResponseBuilder { + response: self.response.take(), + err: self.err.take(), + cookies: self.cookies.take(), + } + } } #[inline] diff --git a/src/json.rs b/src/json.rs index a33fa46d6..263f6028f 100644 --- a/src/json.rs +++ b/src/json.rs @@ -71,7 +71,7 @@ impl Responder for Json { /// .from_err() /// .and_then(|val: MyObj| { // <- deserialized value /// println!("==== BODY ==== {:?}", val); -/// Ok(httpcodes::HTTPOk.response()) +/// Ok(httpcodes::HTTPOk.into()) /// }).responder() /// } /// # fn main() {} diff --git a/src/middleware/logger.rs b/src/middleware/logger.rs index e498ad4c9..ac1d6cc47 100644 --- a/src/middleware/logger.rs +++ b/src/middleware/logger.rs @@ -19,7 +19,7 @@ use middleware::{Middleware, Started, Finished}; /// Default `Logger` could be created with `default` method, it uses the default format: /// /// ```ignore -/// %a %t "%r" %s %b "%{Referrer}i" "%{User-Agent}i" %T +/// %a %t "%r" %s %b "%{Referer}i" "%{User-Agent}i" %T /// ``` /// ```rust /// # extern crate actix_web; @@ -75,7 +75,7 @@ impl Default for Logger { /// Create `Logger` middleware with format: /// /// ```ignore - /// %a %t "%r" %s %b "%{Referrer}i" "%{User-Agent}i" %T + /// %a %t "%r" %s %b "%{Referer}i" "%{User-Agent}i" %T /// ``` fn default() -> Logger { Logger { format: Format::default() } @@ -121,7 +121,7 @@ struct Format(Vec); impl Default for Format { /// Return the default formatting style for the `Logger`: fn default() -> Format { - Format::new(r#"%a %t "%r" %s %b "%{Referrer}i" "%{User-Agent}i" %T"#) + Format::new(r#"%a %t "%r" %s %b "%{Referer}i" "%{User-Agent}i" %T"#) } } diff --git a/src/payload.rs b/src/payload.rs index 7c921070c..df2e4f7fb 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -9,20 +9,14 @@ use futures::{Future, Async, Poll, Stream}; use futures::task::{Task, current as current_task}; use body::BodyStream; -use actix::ResponseType; use error::PayloadError; pub(crate) const DEFAULT_BUFFER_SIZE: usize = 65_536; // max buffer size 64k /// Just Bytes object -#[derive(PartialEq)] +#[derive(PartialEq, Message)] pub struct PayloadItem(pub Bytes); -impl ResponseType for PayloadItem { - type Item = (); - type Error = (); -} - impl Deref for PayloadItem { type Target = Bytes; @@ -91,27 +85,27 @@ impl Payload { } /// Get first available chunk of data. - pub fn readany(&mut self) -> ReadAny { + pub fn readany(&self) -> ReadAny { ReadAny(Rc::clone(&self.inner)) } /// Get exact number of bytes - pub fn readexactly(&mut self, size: usize) -> ReadExactly { + pub fn readexactly(&self, size: usize) -> ReadExactly { ReadExactly(Rc::clone(&self.inner), size) } /// Read until `\n` - pub fn readline(&mut self) -> ReadLine { + pub fn readline(&self) -> ReadLine { ReadLine(Rc::clone(&self.inner)) } /// Read until match line - pub fn readuntil(&mut self, line: &[u8]) -> ReadUntil { + pub fn readuntil(&self, line: &[u8]) -> ReadUntil { ReadUntil(Rc::clone(&self.inner), line.to_vec()) } #[doc(hidden)] - pub fn readall(&mut self) -> Option { + pub fn readall(&self) -> Option { self.inner.borrow_mut().readall() } @@ -132,20 +126,16 @@ impl Payload { /// Convert payload into compatible `HttpResponse` body stream pub fn stream(self) -> BodyStream { - Box::new(self.map_err(|e| e.into())) + Box::new(self.map(|i| i.0).map_err(|e| e.into())) } } impl Stream for Payload { - type Item = Bytes; + type Item = PayloadItem; type Error = PayloadError; - fn poll(&mut self) -> Poll, PayloadError> { - match self.inner.borrow_mut().readany()? { - Async::Ready(Some(item)) => Ok(Async::Ready(Some(item.0))), - Async::Ready(None) => Ok(Async::Ready(None)), - Async::NotReady => Ok(Async::NotReady), - } + fn poll(&mut self) -> Poll, PayloadError> { + self.inner.borrow_mut().readany() } } @@ -474,7 +464,7 @@ mod tests { #[test] fn test_basic() { Core::new().unwrap().run(lazy(|| { - let (_, mut payload) = Payload::new(false); + let (_, payload) = Payload::new(false); assert!(!payload.eof()); assert!(payload.is_empty()); @@ -489,7 +479,7 @@ mod tests { #[test] fn test_eof() { Core::new().unwrap().run(lazy(|| { - let (mut sender, mut payload) = Payload::new(false); + let (mut sender, payload) = Payload::new(false); assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap()); assert!(!payload.eof()); @@ -514,7 +504,7 @@ mod tests { #[test] fn test_err() { Core::new().unwrap().run(lazy(|| { - let (mut sender, mut payload) = Payload::new(false); + let (mut sender, payload) = Payload::new(false); assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap()); @@ -528,7 +518,7 @@ mod tests { #[test] fn test_readany() { Core::new().unwrap().run(lazy(|| { - let (mut sender, mut payload) = Payload::new(false); + let (mut sender, payload) = Payload::new(false); sender.feed_data(Bytes::from("line1")); @@ -552,7 +542,7 @@ mod tests { #[test] fn test_readexactly() { Core::new().unwrap().run(lazy(|| { - let (mut sender, mut payload) = Payload::new(false); + let (mut sender, payload) = Payload::new(false); assert_eq!(Async::NotReady, payload.readexactly(2).poll().ok().unwrap()); @@ -579,7 +569,7 @@ mod tests { #[test] fn test_readuntil() { Core::new().unwrap().run(lazy(|| { - let (mut sender, mut payload) = Payload::new(false); + let (mut sender, payload) = Payload::new(false); assert_eq!(Async::NotReady, payload.readuntil(b"ne").poll().ok().unwrap()); diff --git a/src/pipeline.rs b/src/pipeline.rs index f37065f01..f5f4799c4 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -8,8 +8,8 @@ use futures::unsync::oneshot; use channel::HttpHandlerTask; use body::{Body, BodyStream}; -use context::{Frame, IoContext}; -use error::{Error, UnexpectedTaskFrame}; +use context::{Frame, ActorHttpContext}; +use error::Error; use handler::{Reply, ReplyItem}; use h1writer::{Writer, WriterState}; use httprequest::HttpRequest; @@ -38,7 +38,7 @@ struct PipelineInfo { req: HttpRequest, count: usize, mws: Rc>>>, - context: Option>, + context: Option>, error: Option, } @@ -72,12 +72,6 @@ impl PipelineInfo { } } -enum PipelineResponse { - None, - Context(Box), - Response(Box>), -} - impl> Pipeline { pub fn new(req: HttpRequest, @@ -364,7 +358,7 @@ impl> StartMiddlewares { // waiting for response struct WaitingResponse { - stream: PipelineResponse, + fut: Box>, _s: PhantomData, _h: PhantomData, } @@ -377,65 +371,22 @@ impl WaitingResponse { match reply.into() { ReplyItem::Message(resp) => RunMiddlewares::init(info, resp), - ReplyItem::Actor(ctx) => - PipelineState::Handler( - WaitingResponse { stream: PipelineResponse::Context(ctx), - _s: PhantomData, _h: PhantomData }), ReplyItem::Future(fut) => PipelineState::Handler( - WaitingResponse { stream: PipelineResponse::Response(fut), - _s: PhantomData, _h: PhantomData }), + WaitingResponse { fut: fut, _s: PhantomData, _h: PhantomData }), } } fn poll(mut self, info: &mut PipelineInfo) -> Result, PipelineState> { - let stream = mem::replace(&mut self.stream, PipelineResponse::None); - - match stream { - PipelineResponse::Context(mut context) => { - loop { - match context.poll() { - Ok(Async::Ready(Some(frame))) => { - match frame { - Frame::Message(resp) => { - info.context = Some(context); - return Ok(RunMiddlewares::init(info, resp)) - } - Frame::Payload(_) | Frame::Drain(_) => (), - } - }, - Ok(Async::Ready(None)) => { - error!("Unexpected eof"); - let err: Error = UnexpectedTaskFrame.into(); - return Ok(ProcessResponse::init(err.into())) - }, - Ok(Async::NotReady) => { - self.stream = PipelineResponse::Context(context); - return Err(PipelineState::Handler(self)) - }, - Err(err) => - return Ok(ProcessResponse::init(err.into())) - } - } - }, - PipelineResponse::Response(mut fut) => { - match fut.poll() { - Ok(Async::NotReady) => { - self.stream = PipelineResponse::Response(fut); - Err(PipelineState::Handler(self)) - } - Ok(Async::Ready(response)) => - Ok(RunMiddlewares::init(info, response)), - Err(err) => - Ok(ProcessResponse::init(err.into())), - } - } - PipelineResponse::None => { - unreachable!("Broken internal state") - } + match self.fut.poll() { + Ok(Async::NotReady) => + Err(PipelineState::Handler(self)), + Ok(Async::Ready(response)) => + Ok(RunMiddlewares::init(info, response)), + Err(err) => + Ok(ProcessResponse::init(err.into())), } - } } @@ -554,7 +505,7 @@ impl RunningState { enum IOState { Response, Payload(BodyStream), - Context, + Actor(Box), Done, } @@ -588,10 +539,10 @@ impl ProcessResponse { }; match self.resp.replace_body(Body::Empty) { - Body::Streaming(stream) | Body::Upgrade(stream) => + Body::Streaming(stream) => self.iostate = IOState::Payload(stream), - Body::StreamingContext | Body::UpgradeContext => - self.iostate = IOState::Context, + Body::Actor(ctx) => + self.iostate = IOState::Actor(ctx), _ => (), } @@ -640,17 +591,12 @@ impl ProcessResponse { } } }, - IOState::Context => { - match info.context.as_mut().unwrap().poll() { + IOState::Actor(mut ctx) => { + match ctx.poll() { Ok(Async::Ready(Some(frame))) => { match frame { - Frame::Message(msg) => { - error!("Unexpected message frame {:?}", msg); - info.error = Some(UnexpectedTaskFrame.into()); - return Ok( - FinishingMiddlewares::init(info, self.resp)) - }, Frame::Payload(None) => { + info.context = Some(ctx); self.iostate = IOState::Done; if let Err(err) = io.write_eof() { info.error = Some(err.into()); @@ -660,7 +606,7 @@ impl ProcessResponse { break }, Frame::Payload(Some(chunk)) => { - self.iostate = IOState::Context; + self.iostate = IOState::Actor(ctx); match io.write(chunk.as_ref()) { Err(err) => { info.error = Some(err.into()); @@ -678,11 +624,10 @@ impl ProcessResponse { }, Ok(Async::Ready(None)) => { self.iostate = IOState::Done; - info.context.take(); break } Ok(Async::NotReady) => { - self.iostate = IOState::Context; + self.iostate = IOState::Actor(ctx); break } Err(err) => { diff --git a/src/router.rs b/src/router.rs index eb1027fb3..ebd763bfd 100644 --- a/src/router.rs +++ b/src/router.rs @@ -190,7 +190,7 @@ impl Pattern { } /// Extract pattern parameters from the text - pub(crate) fn update_match_info(&self, text: &str, req: &mut HttpRequest) { + pub fn update_match_info(&self, text: &str, req: &mut HttpRequest) { if !self.names.is_empty() { if let Some(captures) = self.re.captures(text) { let mut idx = 0; @@ -208,8 +208,7 @@ impl Pattern { } /// Build pattern path. - pub fn path(&self, prefix: Option<&str>, elements: U) - -> Result + pub fn path(&self, prefix: Option<&str>, elements: U) -> Result where U: IntoIterator, I: AsRef, { diff --git a/src/test.rs b/src/test.rs index 88c2044f5..a0d8a8de5 100644 --- a/src/test.rs +++ b/src/test.rs @@ -41,7 +41,7 @@ use httpresponse::HttpResponse; /// # extern crate reqwest; /// # /// # fn my_handler(req: HttpRequest) -> HttpResponse { -/// # httpcodes::HTTPOk.response() +/// # httpcodes::HTTPOk.into() /// # } /// # /// # fn main() { @@ -228,9 +228,9 @@ impl Iterator for TestApp { /// /// fn index(req: HttpRequest) -> HttpResponse { /// if let Some(hdr) = req.headers().get(header::CONTENT_TYPE) { -/// httpcodes::HTTPOk.response() +/// httpcodes::HTTPOk.into() /// } else { -/// httpcodes::HTTPBadRequest.response() +/// httpcodes::HTTPBadRequest.into() /// } /// } /// @@ -365,7 +365,6 @@ impl TestRequest { Ok(resp) => { match resp.into().into() { ReplyItem::Message(resp) => Ok(resp), - ReplyItem::Actor(_) => panic!("Actor handler is not supported."), ReplyItem::Future(_) => panic!("Async handler is not supported."), } }, diff --git a/src/ws.rs b/src/ws.rs index 5832e5c29..69e74aadb 100644 --- a/src/ws.rs +++ b/src/ws.rs @@ -12,7 +12,7 @@ //! use actix_web::*; //! //! // do websocket handshake and start actor -//! fn ws_index(req: HttpRequest) -> Result { +//! fn ws_index(req: HttpRequest) -> Result { //! ws::start(req, Ws) //! } //! @@ -52,13 +52,11 @@ use futures::{Async, Poll, Stream}; use actix::{Actor, AsyncContext, ResponseType, StreamHandler}; -use body::Body; -use context::HttpContext; -use handler::Reply; use payload::ReadAny; use error::{Error, WsHandshakeError}; +use context::HttpContext; use httprequest::HttpRequest; -use httpresponse::{ConnectionType, HttpResponse}; +use httpresponse::{ConnectionType, HttpResponse, HttpResponseBuilder}; use wsframe; use wsproto::*; @@ -88,17 +86,17 @@ impl ResponseType for Message { } /// Do websocket handshake and start actor -pub fn start(mut req: HttpRequest, actor: A) -> Result +pub fn start(mut req: HttpRequest, actor: A) -> Result where A: Actor> + StreamHandler, S: 'static { - let resp = handshake(&req)?; - + let mut resp = handshake(&req)?; let stream = WsStream::new(req.payload_mut().readany()); + let mut ctx = HttpContext::new(req, actor); - ctx.start(resp); ctx.add_stream(stream); - Ok(ctx.into()) + + Ok(resp.body(ctx)?) } /// Prepare `WebSocket` handshake response. @@ -109,7 +107,7 @@ pub fn start(mut req: HttpRequest, actor: A) -> Result // /// `protocols` is a sequence of known protocols. On successful handshake, // /// the returned response headers contain the first protocol in this list // /// which the server also knows. -pub fn handshake(req: &HttpRequest) -> Result { +pub fn handshake(req: &HttpRequest) -> Result { // WebSocket accepts only GET if *req.method() != Method::GET { return Err(WsHandshakeError::GetMethodRequired) @@ -163,8 +161,7 @@ pub fn handshake(req: &HttpRequest) -> Result