diff --git a/Cargo.toml b/Cargo.toml index ba48f1c7c..dd7dc5e64 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -99,3 +99,17 @@ version_check = "0.1" lto = true opt-level = 3 # debug = true + +[workspace] +members = [ + "./", + "examples/basic", + "examples/diesel", + "examples/json", + "examples/multipart", + "examples/signals", + "examples/state", + "examples/template_tera", + "examples/tls", + "examples/websocket-chat", +] diff --git a/examples/basic/Cargo.toml b/examples/basic/Cargo.toml index 6bb442e41..d4fc56662 100644 --- a/examples/basic/Cargo.toml +++ b/examples/basic/Cargo.toml @@ -2,6 +2,7 @@ name = "basic" version = "0.1.0" authors = ["Nikolay Kim "] +workspace = "../.." [dependencies] futures = "*" diff --git a/examples/basic/src/main.rs b/examples/basic/src/main.rs index 9895ac946..f3f519807 100644 --- a/examples/basic/src/main.rs +++ b/examples/basic/src/main.rs @@ -9,9 +9,10 @@ use futures::Stream; use actix::*; use actix_web::*; -#[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe}; use actix_web::middleware::RequestSession; use futures::future::{FutureResult, result}; +#[cfg(unix)] +use actix::actors::signal::{ProcessSignals, Subscribe}; /// simple handler fn index(mut req: HttpRequest) -> Result { @@ -96,7 +97,9 @@ fn main() { .bind("0.0.0.0:8080").unwrap() .start(); - if cfg!(target_os = "linux") { // Subscribe to unix signals + // Subscribe to unix signals + #[cfg(unix)] + { let signals = Arbiter::system_registry().get::(); signals.send(Subscribe(addr.subscriber())); } diff --git a/examples/diesel/Cargo.toml b/examples/diesel/Cargo.toml index e5bf2bb4d..31c8c4068 100644 --- a/examples/diesel/Cargo.toml +++ b/examples/diesel/Cargo.toml @@ -2,6 +2,7 @@ name = "diesel-example" version = "0.1.0" authors = ["Nikolay Kim "] +workspace = "../.." [dependencies] env_logger = "0.4" diff --git a/examples/diesel/src/main.rs b/examples/diesel/src/main.rs index 7303690e6..a5b25b21d 100644 --- a/examples/diesel/src/main.rs +++ b/examples/diesel/src/main.rs @@ -45,7 +45,7 @@ fn index(req: HttpRequest) -> Box> .and_then(|res| { match res { Ok(user) => Ok(httpcodes::HTTPOk.build().json(user)?), - Err(_) => Ok(httpcodes::HTTPInternalServerError.response()) + Err(_) => Ok(httpcodes::HTTPInternalServerError.into()) } }) .responder() diff --git a/examples/json/Cargo.toml b/examples/json/Cargo.toml index 7eb3155e7..e4d7ed8fc 100644 --- a/examples/json/Cargo.toml +++ b/examples/json/Cargo.toml @@ -2,6 +2,7 @@ name = "json-example" version = "0.1.0" authors = ["Nikolay Kim "] +workspace = "../.." [dependencies] bytes = "0.4" diff --git a/examples/json/src/main.rs b/examples/json/src/main.rs index 907eaf51b..296d4a4bc 100644 --- a/examples/json/src/main.rs +++ b/examples/json/src/main.rs @@ -9,7 +9,8 @@ extern crate serde_json; use actix::*; use actix_web::*; -#[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe}; +#[cfg(unix)] +use actix::actors::signal::{ProcessSignals, Subscribe}; use bytes::BytesMut; use futures::{Future, Stream}; @@ -96,7 +97,9 @@ fn main() { .shutdown_timeout(1) .start(); - if cfg!(target_os = "linux") { // Subscribe to unix signals + // Subscribe to unix signals + #[cfg(unix)] + { let signals = Arbiter::system_registry().get::(); signals.send(Subscribe(addr.subscriber())); } diff --git a/examples/multipart/Cargo.toml b/examples/multipart/Cargo.toml index 049ca76c5..7a92f465c 100644 --- a/examples/multipart/Cargo.toml +++ b/examples/multipart/Cargo.toml @@ -2,6 +2,7 @@ name = "multipart-example" version = "0.1.0" authors = ["Nikolay Kim "] +workspace = "../.." [[bin]] name = "multipart" diff --git a/examples/multipart/src/main.rs b/examples/multipart/src/main.rs index 84259ec1a..407365cd6 100644 --- a/examples/multipart/src/main.rs +++ b/examples/multipart/src/main.rs @@ -6,7 +6,8 @@ extern crate futures; use actix::*; use actix_web::*; -#[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe}; +#[cfg(unix)] +use actix::actors::signal::{ProcessSignals, Subscribe}; use futures::{Future, Stream}; use futures::future::{result, Either}; @@ -39,7 +40,7 @@ fn index(mut req: HttpRequest) -> Box> } }) .finish() // <- Stream::finish() combinator from actix - .map(|_| httpcodes::HTTPOk.response()) + .map(|_| httpcodes::HTTPOk.into()) .responder() } @@ -55,7 +56,9 @@ fn main() { .bind("127.0.0.1:8080").unwrap() .start(); - if cfg!(target_os = "linux") { // Subscribe to unix signals + // Subscribe to unix signals + #[cfg(unix)] + { let signals = Arbiter::system_registry().get::(); signals.send(Subscribe(addr.subscriber())); } diff --git a/examples/signals/Cargo.toml b/examples/signals/Cargo.toml index 869dc66e7..9352ef5e7 100644 --- a/examples/signals/Cargo.toml +++ b/examples/signals/Cargo.toml @@ -2,6 +2,7 @@ name = "signals" version = "0.1.0" authors = ["Nikolay Kim "] +workspace = "../.." [[bin]] name = "server" diff --git a/examples/signals/src/main.rs b/examples/signals/src/main.rs index 2571fdb4f..7f939081a 100644 --- a/examples/signals/src/main.rs +++ b/examples/signals/src/main.rs @@ -5,7 +5,8 @@ extern crate env_logger; use actix::*; use actix_web::*; -#[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe}; +#[cfg(unix)] +use actix::actors::signal::{ProcessSignals, Subscribe}; struct MyWebSocket; @@ -34,7 +35,9 @@ fn main() { .bind("127.0.0.1:8080").unwrap() .start(); - if cfg!(target_os = "linux") { // Subscribe to unix signals + // Subscribe to unix signals + #[cfg(unix)] + { let signals = Arbiter::system_registry().get::(); signals.send(Subscribe(addr.subscriber())); } diff --git a/examples/state/Cargo.toml b/examples/state/Cargo.toml index c71fc86f8..7e4c7d3dd 100644 --- a/examples/state/Cargo.toml +++ b/examples/state/Cargo.toml @@ -2,6 +2,7 @@ name = "state" version = "0.1.0" authors = ["Nikolay Kim "] +workspace = "../.." [dependencies] futures = "*" diff --git a/examples/state/src/main.rs b/examples/state/src/main.rs index c713e68ec..68e989bf3 100644 --- a/examples/state/src/main.rs +++ b/examples/state/src/main.rs @@ -7,11 +7,14 @@ extern crate actix; extern crate actix_web; extern crate env_logger; -use actix::*; -use actix_web::*; -#[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe}; use std::cell::Cell; +use actix::*; +use actix_web::*; +#[cfg(unix)] +use actix::actors::signal::{ProcessSignals, Subscribe}; + +/// Application state struct AppState { counter: Cell, } @@ -55,7 +58,6 @@ impl Handler for MyWebSocket { } } - fn main() { ::std::env::set_var("RUST_LOG", "actix_web=info"); let _ = env_logger::init(); @@ -74,7 +76,9 @@ fn main() { .bind("127.0.0.1:8080").unwrap() .start(); - if cfg!(target_os = "linux") { // Subscribe to unix signals + // Subscribe to unix signals + #[cfg(unix)] + { let signals = Arbiter::system_registry().get::(); signals.send(Subscribe(addr.subscriber())); } diff --git a/examples/template_tera/Cargo.toml b/examples/template_tera/Cargo.toml index 36e8d8e55..791934d09 100644 --- a/examples/template_tera/Cargo.toml +++ b/examples/template_tera/Cargo.toml @@ -2,6 +2,7 @@ name = "template-tera" version = "0.1.0" authors = ["Nikolay Kim "] +workspace = "../.." [dependencies] env_logger = "0.4" diff --git a/examples/template_tera/src/main.rs b/examples/template_tera/src/main.rs index c4f962e73..1b5552234 100644 --- a/examples/template_tera/src/main.rs +++ b/examples/template_tera/src/main.rs @@ -6,7 +6,9 @@ extern crate tera; use actix::*; use actix_web::*; -#[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe}; +#[cfg(unix)] +use actix::actors::signal::{ProcessSignals, Subscribe}; + struct State { template: tera::Tera, // <- store tera template in application state @@ -43,7 +45,8 @@ fn main() { .bind("127.0.0.1:8080").unwrap() .start(); - if cfg!(target_os = "linux") { // Subscribe to unix signals + #[cfg(unix)] + { // Subscribe to unix signals let signals = Arbiter::system_registry().get::(); signals.send(Subscribe(addr.subscriber())); } diff --git a/examples/tls/Cargo.toml b/examples/tls/Cargo.toml index eda5e5fc8..a659bc36b 100644 --- a/examples/tls/Cargo.toml +++ b/examples/tls/Cargo.toml @@ -2,6 +2,7 @@ name = "tls-example" version = "0.1.0" authors = ["Nikolay Kim "] +workspace = "../.." [[bin]] name = "server" diff --git a/examples/tls/src/main.rs b/examples/tls/src/main.rs index 30625fdba..a754e0738 100644 --- a/examples/tls/src/main.rs +++ b/examples/tls/src/main.rs @@ -8,7 +8,8 @@ use std::io::Read; use actix::*; use actix_web::*; -#[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe}; +#[cfg(unix)] +use actix::actors::signal::{ProcessSignals, Subscribe}; /// somple handle fn index(req: HttpRequest) -> Result { @@ -47,7 +48,9 @@ fn main() { .bind("127.0.0.1:8443").unwrap() .start_ssl(&pkcs12).unwrap(); - if cfg!(target_os = "linux") { // Subscribe to unix signals + // Subscribe to unix signals + #[cfg(unix)] + { let signals = Arbiter::system_registry().get::(); signals.send(Subscribe(addr.subscriber())); } diff --git a/examples/websocket-chat/Cargo.toml b/examples/websocket-chat/Cargo.toml index 6ca5f0ad3..517cf8163 100644 --- a/examples/websocket-chat/Cargo.toml +++ b/examples/websocket-chat/Cargo.toml @@ -2,6 +2,7 @@ name = "websocket-example" version = "0.1.0" authors = ["Nikolay Kim "] +workspace = "../.." [[bin]] name = "server" diff --git a/examples/websocket-chat/src/main.rs b/examples/websocket-chat/src/main.rs index a0236cbed..a4d3ce333 100644 --- a/examples/websocket-chat/src/main.rs +++ b/examples/websocket-chat/src/main.rs @@ -17,7 +17,8 @@ use std::time::Instant; use actix::*; use actix_web::*; -#[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe}; +#[cfg(unix)] +use actix::actors::signal::{ProcessSignals, Subscribe}; mod codec; mod server; @@ -30,7 +31,7 @@ struct WsChatSessionState { } /// Entry point for our route -fn chat_route(req: HttpRequest) -> Result { +fn chat_route(req: HttpRequest) -> Result { ws::start( req, WsChatSession { @@ -215,7 +216,9 @@ fn main() { .bind("127.0.0.1:8080").unwrap() .start(); - if cfg!(target_os = "linux") { // Subscribe to unix signals + // Subscribe to unix signals + #[cfg(unix)] + { let signals = Arbiter::system_registry().get::(); signals.send(Subscribe(addr.subscriber())); } diff --git a/examples/websocket/src/main.rs b/examples/websocket/src/main.rs index a517a18a3..022ada344 100644 --- a/examples/websocket/src/main.rs +++ b/examples/websocket/src/main.rs @@ -13,8 +13,8 @@ use actix_web::*; #[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe}; /// do websocket handshake and start `MyWebSocket` actor -fn ws_index(r: HttpRequest) -> Reply { - ws::start(r, MyWebSocket).into() +fn ws_index(r: HttpRequest) -> Result { + ws::start(r, MyWebSocket) } /// websocket connection is long running connection, it easier diff --git a/guide/src/qs_10.md b/guide/src/qs_10.md index 89bf8c6d3..8974f241d 100644 --- a/guide/src/qs_10.md +++ b/guide/src/qs_10.md @@ -72,7 +72,7 @@ Create `Logger` middleware with the specified `format`. Default `Logger` could be created with `default` method, it uses the default format: ```ignore - %a %t "%r" %s %b "%{Referrer}i" "%{User-Agent}i" %T + %a %t "%r" %s %b "%{Referer}i" "%{User-Agent}i" %T ``` ```rust # extern crate actix_web; diff --git a/guide/src/qs_14.md b/guide/src/qs_14.md index e9e489be7..26fa4ecfb 100644 --- a/guide/src/qs_14.md +++ b/guide/src/qs_14.md @@ -110,7 +110,7 @@ fn index(req: HttpRequest) -> Box> .and_then(|res| { match res { Ok(user) => Ok(httpcodes::HTTPOk.build().json(user)?), - Err(_) => Ok(httpcodes::HTTPInternalServerError.response()) + Err(_) => Ok(httpcodes::HTTPInternalServerError.into()) } }) .responder() diff --git a/guide/src/qs_4.md b/guide/src/qs_4.md index 1a82d51bd..42afb9219 100644 --- a/guide/src/qs_4.md +++ b/guide/src/qs_4.md @@ -65,7 +65,7 @@ impl Handler for MyHandler { /// Handle request fn handle(&mut self, req: HttpRequest) -> Self::Result { self.0 += 1; - httpcodes::HTTPOk.response() + httpcodes::HTTPOk.into() } } # fn main() {} @@ -91,7 +91,7 @@ impl Handler for MyHandler { fn handle(&mut self, req: HttpRequest) -> Self::Result { let num = self.0.load(Ordering::Relaxed) + 1; self.0.store(num, Ordering::Relaxed); - httpcodes::HTTPOk.response() + httpcodes::HTTPOk.into() } } @@ -104,7 +104,7 @@ fn main() { move || { let cloned = inc.clone(); Application::new() - .resource("/", move |r| r.h(MyHandler(cloned))) + .resource("/", move |r| r.h(MyHandler(cloned))) }) .bind("127.0.0.1:8088").unwrap() .start(); diff --git a/guide/src/qs_7.md b/guide/src/qs_7.md index 5d3447514..7cce5932b 100644 --- a/guide/src/qs_7.md +++ b/guide/src/qs_7.md @@ -246,7 +246,7 @@ fn index(mut req: HttpRequest) -> Box> { .from_err() .and_then(|params| { // <- url encoded parameters println!("==== BODY ==== {:?}", params); - ok(httpcodes::HTTPOk.response()) + ok(httpcodes::HTTPOk.into()) }) .responder() } diff --git a/guide/src/qs_8.md b/guide/src/qs_8.md index 7661b9ad4..53713a205 100644 --- a/guide/src/qs_8.md +++ b/guide/src/qs_8.md @@ -20,10 +20,10 @@ use actix_web::test::TestRequest; fn index(req: HttpRequest) -> HttpResponse { if let Some(hdr) = req.headers().get(header::CONTENT_TYPE) { if let Ok(s) = hdr.to_str() { - return httpcodes::HTTPOk.response() + return httpcodes::HTTPOk.into() } } - httpcodes::HTTPBadRequest.response() + httpcodes::HTTPBadRequest.into() } fn main() { @@ -60,7 +60,7 @@ use actix_web::*; use actix_web::test::TestServer; fn index(req: HttpRequest) -> HttpResponse { - httpcodes::HTTPOk.response() + httpcodes::HTTPOk.into() } fn main() { @@ -80,7 +80,7 @@ use actix_web::*; use actix_web::test::TestServer; fn index(req: HttpRequest) -> HttpResponse { - httpcodes::HTTPOk.response() + httpcodes::HTTPOk.into() } /// This function get called by http server. diff --git a/src/application.rs b/src/application.rs index 0fb44bedd..02ae078ba 100644 --- a/src/application.rs +++ b/src/application.rs @@ -128,7 +128,7 @@ impl Application where S: 'static { } } - /// Set application prefix. + /// Set application prefix /// /// Only requests that matches application's prefix get processed by this application. /// Application prefix always contains laading "/" slash. If supplied prefix diff --git a/src/body.rs b/src/body.rs index b9e6676e7..53af6e40e 100644 --- a/src/body.rs +++ b/src/body.rs @@ -5,6 +5,7 @@ use bytes::{Bytes, BytesMut}; use futures::Stream; use error::Error; +use context::ActorHttpContext; /// Type represent streaming body pub type BodyStream = Box>; @@ -18,12 +19,8 @@ pub enum Body { /// Unspecified streaming response. Developer is responsible for setting /// right `Content-Length` or `Transfer-Encoding` headers. Streaming(BodyStream), - /// Upgrade connection. - Upgrade(BodyStream), - /// Special body type for actor streaming response. - StreamingContext, - /// Special body type for actor upgrade response. - UpgradeContext, + /// Special body type for actor response. + Actor(Box), } /// Represents various types of binary body. @@ -51,8 +48,7 @@ impl Body { #[inline] pub fn is_streaming(&self) -> bool { match *self { - Body::Streaming(_) | Body::StreamingContext - | Body::Upgrade(_) | Body::UpgradeContext => true, + Body::Streaming(_) | Body::Actor(_) => true, _ => false } } @@ -83,15 +79,7 @@ impl PartialEq for Body { Body::Binary(ref b2) => b == b2, _ => false, }, - Body::StreamingContext => match *other { - Body::StreamingContext => true, - _ => false, - }, - Body::UpgradeContext => match *other { - Body::UpgradeContext => true, - _ => false, - }, - Body::Streaming(_) | Body::Upgrade(_) => false, + Body::Streaming(_) | Body::Actor(_) => false, } } } @@ -102,9 +90,7 @@ impl fmt::Debug for Body { Body::Empty => write!(f, "Body::Empty"), Body::Binary(ref b) => write!(f, "Body::Binary({:?})", b), Body::Streaming(_) => write!(f, "Body::Streaming(_)"), - Body::Upgrade(_) => write!(f, "Body::Upgrade(_)"), - Body::StreamingContext => write!(f, "Body::StreamingContext"), - Body::UpgradeContext => write!(f, "Body::UpgradeContext"), + Body::Actor(_) => write!(f, "Body::Actor(_)"), } } } @@ -115,6 +101,12 @@ impl From for Body where T: Into{ } } +impl From> for Body { + fn from(ctx: Box) -> Body { + Body::Actor(ctx) + } +} + impl Binary { #[inline] pub fn is_empty(&self) -> bool { diff --git a/src/context.rs b/src/context.rs index f0e80de1a..1cdb6b9b4 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,6 +1,7 @@ use std; +use std::marker::PhantomData; use std::collections::VecDeque; -use futures::{Async, Poll}; +use futures::{Async, Future, Poll}; use futures::sync::oneshot::Sender; use futures::unsync::oneshot; @@ -11,19 +12,18 @@ use actix::dev::{AsyncContextApi, ActorAddressCell, ActorItemsCell, ActorWaitCel Envelope, ToEnvelope, RemoteEnvelope}; use body::{Body, Binary}; -use error::Error; +use error::{Error, Result}; use httprequest::HttpRequest; use httpresponse::HttpResponse; -pub(crate) trait IoContext: 'static { +pub trait ActorHttpContext: 'static { fn disconnected(&mut self); fn poll(&mut self) -> Poll, Error>; } #[derive(Debug)] -pub(crate) enum Frame { - Message(HttpResponse), +pub enum Frame { Payload(Option), Drain(oneshot::Sender<()>), } @@ -31,7 +31,7 @@ pub(crate) enum Frame { /// Http actor execution context pub struct HttpContext where A: Actor>, { - act: A, + act: Option, state: ActorState, modified: bool, items: ActorItemsCell, @@ -39,7 +39,6 @@ pub struct HttpContext where A: Actor>, stream: VecDeque, wait: ActorWaitCell, request: HttpRequest, - streaming: bool, disconnected: bool, } @@ -101,12 +100,15 @@ impl AsyncContextApi for HttpContext where A: Actor } } -impl HttpContext where A: Actor { +impl HttpContext where A: Actor { - pub fn new(req: HttpRequest, actor: A) -> HttpContext - { + pub fn new(req: HttpRequest, actor: A) -> HttpContext { + HttpContext::from_request(req).actor(actor) + } + + pub fn from_request(req: HttpRequest) -> HttpContext { HttpContext { - act: actor, + act: None, state: ActorState::Started, modified: false, items: ActorItemsCell::default(), @@ -114,10 +116,24 @@ impl HttpContext where A: Actor { wait: ActorWaitCell::default(), stream: VecDeque::new(), request: req, - streaming: false, disconnected: false, } } + + pub fn actor(mut self, actor: A) -> HttpContext { + self.act = Some(actor); + self + } + + pub fn with_actor(mut self, actor: A, mut resp: HttpResponse) -> Result { + if self.act.is_some() { + panic!("Actor is set already"); + } + self.act = Some(actor); + + resp.replace_body(Body::Actor(Box::new(self))); + Ok(resp) + } } impl HttpContext where A: Actor { @@ -132,24 +148,12 @@ impl HttpContext where A: Actor { &mut self.request } - /// Send response to peer - pub fn start>(&mut self, response: R) { - let resp = response.into(); - match *resp.body() { - Body::StreamingContext | Body::UpgradeContext => self.streaming = true, - _ => (), - } - self.stream.push_back(Frame::Message(resp)) - } - /// Write payload pub fn write>(&mut self, data: B) { - if self.streaming { - if !self.disconnected { - self.stream.push_back(Frame::Payload(Some(data.into()))) - } + if !self.disconnected { + self.stream.push_back(Frame::Payload(Some(data.into()))); } else { - warn!("Trying to write response body for non-streaming response"); + warn!("Trying to write to disconnected response"); } } @@ -159,11 +163,11 @@ impl HttpContext where A: Actor { } /// Returns drain future - pub fn drain(&mut self) -> oneshot::Receiver<()> { + pub fn drain(&mut self) -> Drain { let (tx, rx) = oneshot::channel(); self.modified = true; self.stream.push_back(Frame::Drain(tx)); - rx + Drain::new(rx) } /// Check if connection still open @@ -193,7 +197,7 @@ impl HttpContext where A: Actor { } } -impl IoContext for HttpContext where A: Actor, S: 'static { +impl ActorHttpContext for HttpContext where A: Actor, S: 'static { fn disconnected(&mut self) { self.items.stop(); @@ -204,8 +208,11 @@ impl IoContext for HttpContext where A: Actor, S: 'sta } fn poll(&mut self) -> Poll, Error> { + if self.act.is_none() { + return Ok(Async::Ready(None)) + } let act: &mut A = unsafe { - std::mem::transmute(&mut self.act as &mut A) + std::mem::transmute(self.act.as_mut().unwrap() as &mut A) }; let ctx: &mut HttpContext = unsafe { std::mem::transmute(self as &mut HttpContext) @@ -303,3 +310,39 @@ impl ToEnvelope for HttpContext RemoteEnvelope::new(msg, tx).into() } } + +impl From> for Body + where A: Actor>, + S: 'static +{ + fn from(ctx: HttpContext) -> Body { + Body::Actor(Box::new(ctx)) + } +} + +pub struct Drain { + fut: oneshot::Receiver<()>, + _a: PhantomData, +} + +impl Drain { + fn new(fut: oneshot::Receiver<()>) -> Self { + Drain { + fut: fut, + _a: PhantomData + } + } +} + +impl ActorFuture for Drain { + type Item = (); + type Error = (); + type Actor = A; + + fn poll(&mut self, + _: &mut A, + _: &mut ::Context) -> Poll + { + self.fut.poll().map_err(|_| ()) + } +} diff --git a/src/encoding.rs b/src/encoding.rs index 3254f6404..e30ba9f40 100644 --- a/src/encoding.rs +++ b/src/encoding.rs @@ -416,8 +416,20 @@ impl PayloadEncoder { resp.headers_mut().remove(CONTENT_LENGTH); TransferEncoding::eof(buf) } - Body::Streaming(_) | Body::StreamingContext => { - if resp.chunked() { + Body::Streaming(_) | Body::Actor(_) => { + if resp.upgrade() { + if version == Version::HTTP_2 { + error!("Connection upgrade is forbidden for HTTP/2"); + } else { + resp.headers_mut().insert( + CONNECTION, HeaderValue::from_static("upgrade")); + } + if encoding != ContentEncoding::Identity { + encoding = ContentEncoding::Identity; + resp.headers_mut().remove(CONTENT_ENCODING); + } + TransferEncoding::eof(buf) + } else if resp.chunked() { resp.headers_mut().remove(CONTENT_LENGTH); if version != Version::HTTP_11 { error!("Chunked transfer encoding is forbidden for {:?}", version); @@ -446,19 +458,6 @@ impl PayloadEncoder { TransferEncoding::eof(buf) } } - Body::Upgrade(_) | Body::UpgradeContext => { - if version == Version::HTTP_2 { - error!("Connection upgrade is forbidden for HTTP/2"); - } else { - resp.headers_mut().insert( - CONNECTION, HeaderValue::from_static("upgrade")); - } - if encoding != ContentEncoding::Identity { - encoding = ContentEncoding::Identity; - resp.headers_mut().remove(CONTENT_ENCODING); - } - TransferEncoding::eof(buf) - } }; resp.replace_body(body); diff --git a/src/error.rs b/src/error.rs index ef4b65c56..31ae69fbb 100644 --- a/src/error.rs +++ b/src/error.rs @@ -114,14 +114,6 @@ impl ResponseError for header::InvalidHeaderValue {} /// `InternalServerError` for `futures::Canceled` impl ResponseError for Canceled {} -/// Internal error -#[doc(hidden)] -#[derive(Fail, Debug)] -#[fail(display="Unexpected task frame")] -pub struct UnexpectedTaskFrame; - -impl ResponseError for UnexpectedTaskFrame {} - /// A set of errors that can occur during parsing HTTP streams #[derive(Fail, Debug)] pub enum ParseError { diff --git a/src/h1writer.rs b/src/h1writer.rs index 2deca9d14..011e56ace 100644 --- a/src/h1writer.rs +++ b/src/h1writer.rs @@ -192,7 +192,6 @@ impl Writer for H1Writer { if let Body::Binary(bytes) = body { self.encoder.write(bytes.as_ref())?; - return Ok(WriterState::Done) } else { msg.replace_body(body); } diff --git a/src/h2writer.rs b/src/h2writer.rs index d3091c6f3..51027f7f4 100644 --- a/src/h2writer.rs +++ b/src/h2writer.rs @@ -92,7 +92,7 @@ impl H2Writer { let cap = cmp::min(buffer.len(), CHUNK_SIZE); stream.reserve_capacity(cap); } else { - return Ok(WriterState::Done) + return Ok(WriterState::Pause) } } Err(_) => { @@ -130,9 +130,23 @@ impl Writer for H2Writer { // using helpers::date is quite a lot faster if !msg.headers().contains_key(DATE) { let mut bytes = BytesMut::with_capacity(29); - helpers::date(&mut bytes); - msg.headers_mut().insert( - DATE, HeaderValue::try_from(bytes.freeze()).unwrap()); + helpers::date_value(&mut bytes); + msg.headers_mut().insert(DATE, HeaderValue::try_from(bytes.freeze()).unwrap()); + } + + let body = msg.replace_body(Body::Empty); + match body { + Body::Binary(ref bytes) => { + let mut val = BytesMut::new(); + helpers::convert_usize(bytes.len(), &mut val); + let l = val.len(); + msg.headers_mut().insert( + CONTENT_LENGTH, HeaderValue::try_from(val.split_to(l-2).freeze()).unwrap()); + } + Body::Empty => { + msg.headers_mut().insert(CONTENT_LENGTH, HeaderValue::from_static("0")); + }, + _ => (), } let mut resp = Response::new(()); @@ -142,19 +156,6 @@ impl Writer for H2Writer { resp.headers_mut().insert(key, value.clone()); } - match *msg.body() { - Body::Binary(ref bytes) => { - let mut val = BytesMut::new(); - helpers::convert_usize(bytes.len(), &mut val); - resp.headers_mut().insert( - CONTENT_LENGTH, HeaderValue::try_from(val.freeze()).unwrap()); - } - Body::Empty => { - resp.headers_mut().insert(CONTENT_LENGTH, HeaderValue::from_static("0")); - }, - _ => (), - } - match self.respond.send_response(resp, self.flags.contains(Flags::EOF)) { Ok(stream) => self.stream = Some(stream), @@ -162,20 +163,19 @@ impl Writer for H2Writer { return Err(io::Error::new(io::ErrorKind::Other, "err")), } - // trace!("Response: {:?}", msg); + trace!("Response: {:?}", msg); - if msg.body().is_binary() { - if let Body::Binary(bytes) = msg.replace_body(Body::Empty) { - self.flags.insert(Flags::EOF); - self.encoder.write(bytes.as_ref())?; - if let Some(ref mut stream) = self.stream { - stream.reserve_capacity(cmp::min(self.encoder.len(), CHUNK_SIZE)); - } - return Ok(WriterState::Done) + if let Body::Binary(bytes) = body { + self.flags.insert(Flags::EOF); + self.encoder.write(bytes.as_ref())?; + if let Some(ref mut stream) = self.stream { + stream.reserve_capacity(cmp::min(self.encoder.len(), CHUNK_SIZE)); } + Ok(WriterState::Pause) + } else { + msg.replace_body(body); + Ok(WriterState::Done) } - - Ok(WriterState::Done) } fn write(&mut self, payload: &[u8]) -> Result { diff --git a/src/handler.rs b/src/handler.rs index deab468ea..106fecc8e 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -1,13 +1,11 @@ use std::marker::PhantomData; -use actix::Actor; -use futures::future::{Future, ok, err}; use regex::Regex; +use futures::future::{Future, ok, err}; use http::{header, StatusCode, Error as HttpError}; use body::Body; use error::Error; -use context::{HttpContext, IoContext}; use httprequest::HttpRequest; use httpresponse::HttpResponse; @@ -69,20 +67,11 @@ pub struct Reply(ReplyItem); pub(crate) enum ReplyItem { Message(HttpResponse), - Actor(Box), Future(Box>), } impl Reply { - /// Create actor response - #[inline] - pub fn actor(ctx: HttpContext) -> Reply - where A: Actor>, S: 'static - { - Reply(ReplyItem::Actor(Box::new(ctx))) - } - /// Create async response #[inline] pub fn async(fut: F) -> Reply @@ -163,25 +152,6 @@ impl> From> for Reply { } } -impl>, S: 'static> Responder for HttpContext -{ - type Item = Reply; - type Error = Error; - - #[inline] - fn respond_to(self, _: HttpRequest) -> Result { - Ok(Reply(ReplyItem::Actor(Box::new(self)))) - } -} - -impl>, S: 'static> From> for Reply { - - #[inline] - fn from(ctx: HttpContext) -> Reply { - Reply(ReplyItem::Actor(Box::new(ctx))) - } -} - impl Responder for Box> where I: Responder + 'static, E: Into + 'static diff --git a/src/helpers.rs b/src/helpers.rs index 8211b03d1..ba6fd49d2 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -11,9 +11,9 @@ use http::Version; use httprequest::HttpMessage; // "Sun, 06 Nov 1994 08:49:37 GMT".len() -pub const DATE_VALUE_LENGTH: usize = 29; +pub(crate) const DATE_VALUE_LENGTH: usize = 29; -pub fn date(dst: &mut BytesMut) { +pub(crate) fn date(dst: &mut BytesMut) { CACHED.with(|cache| { let mut buf: [u8; 39] = unsafe { mem::uninitialized() }; buf[..6].copy_from_slice(b"date: "); @@ -23,7 +23,13 @@ pub fn date(dst: &mut BytesMut) { }) } -pub fn update_date() { +pub(crate) fn date_value(dst: &mut BytesMut) { + CACHED.with(|cache| { + dst.extend_from_slice(cache.borrow().buffer()); + }) +} + +pub(crate) fn update_date() { CACHED.with(|cache| { cache.borrow_mut().update(); }); diff --git a/src/httpcodes.rs b/src/httpcodes.rs index abe226cbe..e27c56237 100644 --- a/src/httpcodes.rs +++ b/src/httpcodes.rs @@ -54,9 +54,6 @@ impl StaticResponse { pub fn build(&self) -> HttpResponseBuilder { HttpResponse::build(self.0) } - pub fn response(&self) -> HttpResponse { - HttpResponse::new(self.0, Body::Empty) - } pub fn with_reason(self, reason: &'static str) -> HttpResponse { let mut resp = HttpResponse::new(self.0, Body::Empty); resp.set_reason(reason); @@ -92,7 +89,7 @@ impl Responder for StaticResponse { impl From for HttpResponse { fn from(st: StaticResponse) -> Self { - st.response() + HttpResponse::new(st.0, Body::Empty) } } @@ -153,7 +150,7 @@ mod tests { #[test] fn test_response() { - let resp = HTTPOk.response(); + let resp: HttpResponse = HTTPOk.into(); assert_eq!(resp.status(), StatusCode::OK); } @@ -165,7 +162,7 @@ mod tests { #[test] fn test_with_reason() { - let resp = HTTPOk.response(); + let resp: HttpResponse = HTTPOk.into(); assert_eq!(resp.reason(), "OK"); let resp = HTTPBadRequest.with_reason("test"); diff --git a/src/httprequest.rs b/src/httprequest.rs index 96c36dbb3..dd8de37dc 100644 --- a/src/httprequest.rs +++ b/src/httprequest.rs @@ -135,6 +135,12 @@ impl HttpRequest<()> { impl HttpRequest { + #[inline] + /// Construct new http request with state. + pub fn change_state(self, state: Rc) -> HttpRequest { + HttpRequest(self.0, Some(state), self.2.clone()) + } + #[inline] /// Construct new http request without state. pub(crate) fn clone_without_state(&self) -> HttpRequest { @@ -447,7 +453,7 @@ impl HttpRequest { /// } /// }) /// .finish() // <- Stream::finish() combinator from actix - /// .map(|_| httpcodes::HTTPOk.response()) + /// .map(|_| httpcodes::HTTPOk.into()) /// .responder() /// } /// # fn main() {} @@ -477,7 +483,7 @@ impl HttpRequest { /// .from_err() /// .and_then(|params| { // <- url encoded parameters /// println!("==== BODY ==== {:?}", params); - /// ok(httpcodes::HTTPOk.response()) + /// ok(httpcodes::HTTPOk.into()) /// }) /// .responder() /// } @@ -512,7 +518,7 @@ impl HttpRequest { /// .from_err() /// .and_then(|val: MyObj| { // <- deserialized value /// println!("==== BODY ==== {:?}", val); - /// Ok(httpcodes::HTTPOk.response()) + /// Ok(httpcodes::HTTPOk.into()) /// }).responder() /// } /// # fn main() {} diff --git a/src/httpresponse.rs b/src/httpresponse.rs index 69bb71a72..5d5e85fcb 100644 --- a/src/httpresponse.rs +++ b/src/httpresponse.rs @@ -138,6 +138,7 @@ impl HttpResponse { } /// Connection upgrade status + #[inline] pub fn upgrade(&self) -> bool { self.get_ref().connection_type == Some(ConnectionType::Upgrade) } @@ -155,11 +156,13 @@ impl HttpResponse { } /// is chunked encoding enabled + #[inline] pub fn chunked(&self) -> bool { self.get_ref().chunked } /// Content encoding + #[inline] pub fn content_encoding(&self) -> &ContentEncoding { &self.get_ref().encoding } @@ -171,6 +174,7 @@ impl HttpResponse { } /// Get body os this response + #[inline] pub fn body(&self) -> &Body { &self.get_ref().body } @@ -443,6 +447,15 @@ impl HttpResponseBuilder { pub fn finish(&mut self) -> Result { self.body(Body::Empty) } + + /// This method construct new `HttpResponseBuilder` + pub fn take(&mut self) -> HttpResponseBuilder { + HttpResponseBuilder { + response: self.response.take(), + err: self.err.take(), + cookies: self.cookies.take(), + } + } } #[inline] diff --git a/src/json.rs b/src/json.rs index a33fa46d6..263f6028f 100644 --- a/src/json.rs +++ b/src/json.rs @@ -71,7 +71,7 @@ impl Responder for Json { /// .from_err() /// .and_then(|val: MyObj| { // <- deserialized value /// println!("==== BODY ==== {:?}", val); -/// Ok(httpcodes::HTTPOk.response()) +/// Ok(httpcodes::HTTPOk.into()) /// }).responder() /// } /// # fn main() {} diff --git a/src/middleware/logger.rs b/src/middleware/logger.rs index e498ad4c9..ac1d6cc47 100644 --- a/src/middleware/logger.rs +++ b/src/middleware/logger.rs @@ -19,7 +19,7 @@ use middleware::{Middleware, Started, Finished}; /// Default `Logger` could be created with `default` method, it uses the default format: /// /// ```ignore -/// %a %t "%r" %s %b "%{Referrer}i" "%{User-Agent}i" %T +/// %a %t "%r" %s %b "%{Referer}i" "%{User-Agent}i" %T /// ``` /// ```rust /// # extern crate actix_web; @@ -75,7 +75,7 @@ impl Default for Logger { /// Create `Logger` middleware with format: /// /// ```ignore - /// %a %t "%r" %s %b "%{Referrer}i" "%{User-Agent}i" %T + /// %a %t "%r" %s %b "%{Referer}i" "%{User-Agent}i" %T /// ``` fn default() -> Logger { Logger { format: Format::default() } @@ -121,7 +121,7 @@ struct Format(Vec); impl Default for Format { /// Return the default formatting style for the `Logger`: fn default() -> Format { - Format::new(r#"%a %t "%r" %s %b "%{Referrer}i" "%{User-Agent}i" %T"#) + Format::new(r#"%a %t "%r" %s %b "%{Referer}i" "%{User-Agent}i" %T"#) } } diff --git a/src/payload.rs b/src/payload.rs index 7c921070c..df2e4f7fb 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -9,20 +9,14 @@ use futures::{Future, Async, Poll, Stream}; use futures::task::{Task, current as current_task}; use body::BodyStream; -use actix::ResponseType; use error::PayloadError; pub(crate) const DEFAULT_BUFFER_SIZE: usize = 65_536; // max buffer size 64k /// Just Bytes object -#[derive(PartialEq)] +#[derive(PartialEq, Message)] pub struct PayloadItem(pub Bytes); -impl ResponseType for PayloadItem { - type Item = (); - type Error = (); -} - impl Deref for PayloadItem { type Target = Bytes; @@ -91,27 +85,27 @@ impl Payload { } /// Get first available chunk of data. - pub fn readany(&mut self) -> ReadAny { + pub fn readany(&self) -> ReadAny { ReadAny(Rc::clone(&self.inner)) } /// Get exact number of bytes - pub fn readexactly(&mut self, size: usize) -> ReadExactly { + pub fn readexactly(&self, size: usize) -> ReadExactly { ReadExactly(Rc::clone(&self.inner), size) } /// Read until `\n` - pub fn readline(&mut self) -> ReadLine { + pub fn readline(&self) -> ReadLine { ReadLine(Rc::clone(&self.inner)) } /// Read until match line - pub fn readuntil(&mut self, line: &[u8]) -> ReadUntil { + pub fn readuntil(&self, line: &[u8]) -> ReadUntil { ReadUntil(Rc::clone(&self.inner), line.to_vec()) } #[doc(hidden)] - pub fn readall(&mut self) -> Option { + pub fn readall(&self) -> Option { self.inner.borrow_mut().readall() } @@ -132,20 +126,16 @@ impl Payload { /// Convert payload into compatible `HttpResponse` body stream pub fn stream(self) -> BodyStream { - Box::new(self.map_err(|e| e.into())) + Box::new(self.map(|i| i.0).map_err(|e| e.into())) } } impl Stream for Payload { - type Item = Bytes; + type Item = PayloadItem; type Error = PayloadError; - fn poll(&mut self) -> Poll, PayloadError> { - match self.inner.borrow_mut().readany()? { - Async::Ready(Some(item)) => Ok(Async::Ready(Some(item.0))), - Async::Ready(None) => Ok(Async::Ready(None)), - Async::NotReady => Ok(Async::NotReady), - } + fn poll(&mut self) -> Poll, PayloadError> { + self.inner.borrow_mut().readany() } } @@ -474,7 +464,7 @@ mod tests { #[test] fn test_basic() { Core::new().unwrap().run(lazy(|| { - let (_, mut payload) = Payload::new(false); + let (_, payload) = Payload::new(false); assert!(!payload.eof()); assert!(payload.is_empty()); @@ -489,7 +479,7 @@ mod tests { #[test] fn test_eof() { Core::new().unwrap().run(lazy(|| { - let (mut sender, mut payload) = Payload::new(false); + let (mut sender, payload) = Payload::new(false); assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap()); assert!(!payload.eof()); @@ -514,7 +504,7 @@ mod tests { #[test] fn test_err() { Core::new().unwrap().run(lazy(|| { - let (mut sender, mut payload) = Payload::new(false); + let (mut sender, payload) = Payload::new(false); assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap()); @@ -528,7 +518,7 @@ mod tests { #[test] fn test_readany() { Core::new().unwrap().run(lazy(|| { - let (mut sender, mut payload) = Payload::new(false); + let (mut sender, payload) = Payload::new(false); sender.feed_data(Bytes::from("line1")); @@ -552,7 +542,7 @@ mod tests { #[test] fn test_readexactly() { Core::new().unwrap().run(lazy(|| { - let (mut sender, mut payload) = Payload::new(false); + let (mut sender, payload) = Payload::new(false); assert_eq!(Async::NotReady, payload.readexactly(2).poll().ok().unwrap()); @@ -579,7 +569,7 @@ mod tests { #[test] fn test_readuntil() { Core::new().unwrap().run(lazy(|| { - let (mut sender, mut payload) = Payload::new(false); + let (mut sender, payload) = Payload::new(false); assert_eq!(Async::NotReady, payload.readuntil(b"ne").poll().ok().unwrap()); diff --git a/src/pipeline.rs b/src/pipeline.rs index e8d739428..f5f4799c4 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -8,8 +8,8 @@ use futures::unsync::oneshot; use channel::HttpHandlerTask; use body::{Body, BodyStream}; -use context::{Frame, IoContext}; -use error::{Error, UnexpectedTaskFrame}; +use context::{Frame, ActorHttpContext}; +use error::Error; use handler::{Reply, ReplyItem}; use h1writer::{Writer, WriterState}; use httprequest::HttpRequest; @@ -38,7 +38,7 @@ struct PipelineInfo { req: HttpRequest, count: usize, mws: Rc>>>, - context: Option>, + context: Option>, error: Option, } @@ -72,12 +72,6 @@ impl PipelineInfo { } } -enum PipelineResponse { - None, - Context(Box), - Response(Box>), -} - impl> Pipeline { pub fn new(req: HttpRequest, @@ -364,7 +358,7 @@ impl> StartMiddlewares { // waiting for response struct WaitingResponse { - stream: PipelineResponse, + fut: Box>, _s: PhantomData, _h: PhantomData, } @@ -377,65 +371,22 @@ impl WaitingResponse { match reply.into() { ReplyItem::Message(resp) => RunMiddlewares::init(info, resp), - ReplyItem::Actor(ctx) => - PipelineState::Handler( - WaitingResponse { stream: PipelineResponse::Context(ctx), - _s: PhantomData, _h: PhantomData }), ReplyItem::Future(fut) => PipelineState::Handler( - WaitingResponse { stream: PipelineResponse::Response(fut), - _s: PhantomData, _h: PhantomData }), + WaitingResponse { fut: fut, _s: PhantomData, _h: PhantomData }), } } fn poll(mut self, info: &mut PipelineInfo) -> Result, PipelineState> { - let stream = mem::replace(&mut self.stream, PipelineResponse::None); - - match stream { - PipelineResponse::Context(mut context) => { - loop { - match context.poll() { - Ok(Async::Ready(Some(frame))) => { - match frame { - Frame::Message(resp) => { - info.context = Some(context); - return Ok(RunMiddlewares::init(info, resp)) - } - Frame::Payload(_) | Frame::Drain(_) => (), - } - }, - Ok(Async::Ready(None)) => { - error!("Unexpected eof"); - let err: Error = UnexpectedTaskFrame.into(); - return Ok(ProcessResponse::init(err.into())) - }, - Ok(Async::NotReady) => { - self.stream = PipelineResponse::Context(context); - return Err(PipelineState::Handler(self)) - }, - Err(err) => - return Ok(ProcessResponse::init(err.into())) - } - } - }, - PipelineResponse::Response(mut fut) => { - match fut.poll() { - Ok(Async::NotReady) => { - self.stream = PipelineResponse::Response(fut); - Err(PipelineState::Handler(self)) - } - Ok(Async::Ready(response)) => - Ok(RunMiddlewares::init(info, response)), - Err(err) => - Ok(ProcessResponse::init(err.into())), - } - } - PipelineResponse::None => { - unreachable!("Broken internal state") - } + match self.fut.poll() { + Ok(Async::NotReady) => + Err(PipelineState::Handler(self)), + Ok(Async::Ready(response)) => + Ok(RunMiddlewares::init(info, response)), + Err(err) => + Ok(ProcessResponse::init(err.into())), } - } } @@ -554,7 +505,7 @@ impl RunningState { enum IOState { Response, Payload(BodyStream), - Context, + Actor(Box), Done, } @@ -576,12 +527,10 @@ impl ProcessResponse { { if self.drain.is_none() && self.running != RunningState::Paused { // if task is paused, write buffer is probably full - loop { let result = match mem::replace(&mut self.iostate, IOState::Done) { IOState::Response => { - let result = match io.start(info.req_mut().get_inner(), - &mut self.resp) { + let result = match io.start(info.req_mut().get_inner(), &mut self.resp) { Ok(res) => res, Err(err) => { info.error = Some(err.into()); @@ -590,10 +539,10 @@ impl ProcessResponse { }; match self.resp.replace_body(Body::Empty) { - Body::Streaming(stream) | Body::Upgrade(stream) => + Body::Streaming(stream) => self.iostate = IOState::Payload(stream), - Body::StreamingContext | Body::UpgradeContext => - self.iostate = IOState::Context, + Body::Actor(ctx) => + self.iostate = IOState::Actor(ctx), _ => (), } @@ -642,17 +591,12 @@ impl ProcessResponse { } } }, - IOState::Context => { - match info.context.as_mut().unwrap().poll() { + IOState::Actor(mut ctx) => { + match ctx.poll() { Ok(Async::Ready(Some(frame))) => { match frame { - Frame::Message(msg) => { - error!("Unexpected message frame {:?}", msg); - info.error = Some(UnexpectedTaskFrame.into()); - return Ok( - FinishingMiddlewares::init(info, self.resp)) - }, Frame::Payload(None) => { + info.context = Some(ctx); self.iostate = IOState::Done; if let Err(err) = io.write_eof() { info.error = Some(err.into()); @@ -662,7 +606,7 @@ impl ProcessResponse { break }, Frame::Payload(Some(chunk)) => { - self.iostate = IOState::Context; + self.iostate = IOState::Actor(ctx); match io.write(chunk.as_ref()) { Err(err) => { info.error = Some(err.into()); @@ -680,11 +624,10 @@ impl ProcessResponse { }, Ok(Async::Ready(None)) => { self.iostate = IOState::Done; - info.context.take(); break } Ok(Async::NotReady) => { - self.iostate = IOState::Context; + self.iostate = IOState::Actor(ctx); break } Err(err) => { diff --git a/src/router.rs b/src/router.rs index eb1027fb3..ebd763bfd 100644 --- a/src/router.rs +++ b/src/router.rs @@ -190,7 +190,7 @@ impl Pattern { } /// Extract pattern parameters from the text - pub(crate) fn update_match_info(&self, text: &str, req: &mut HttpRequest) { + pub fn update_match_info(&self, text: &str, req: &mut HttpRequest) { if !self.names.is_empty() { if let Some(captures) = self.re.captures(text) { let mut idx = 0; @@ -208,8 +208,7 @@ impl Pattern { } /// Build pattern path. - pub fn path(&self, prefix: Option<&str>, elements: U) - -> Result + pub fn path(&self, prefix: Option<&str>, elements: U) -> Result where U: IntoIterator, I: AsRef, { diff --git a/src/test.rs b/src/test.rs index 88c2044f5..a0d8a8de5 100644 --- a/src/test.rs +++ b/src/test.rs @@ -41,7 +41,7 @@ use httpresponse::HttpResponse; /// # extern crate reqwest; /// # /// # fn my_handler(req: HttpRequest) -> HttpResponse { -/// # httpcodes::HTTPOk.response() +/// # httpcodes::HTTPOk.into() /// # } /// # /// # fn main() { @@ -228,9 +228,9 @@ impl Iterator for TestApp { /// /// fn index(req: HttpRequest) -> HttpResponse { /// if let Some(hdr) = req.headers().get(header::CONTENT_TYPE) { -/// httpcodes::HTTPOk.response() +/// httpcodes::HTTPOk.into() /// } else { -/// httpcodes::HTTPBadRequest.response() +/// httpcodes::HTTPBadRequest.into() /// } /// } /// @@ -365,7 +365,6 @@ impl TestRequest { Ok(resp) => { match resp.into().into() { ReplyItem::Message(resp) => Ok(resp), - ReplyItem::Actor(_) => panic!("Actor handler is not supported."), ReplyItem::Future(_) => panic!("Async handler is not supported."), } }, diff --git a/src/ws.rs b/src/ws.rs index 5832e5c29..69e74aadb 100644 --- a/src/ws.rs +++ b/src/ws.rs @@ -12,7 +12,7 @@ //! use actix_web::*; //! //! // do websocket handshake and start actor -//! fn ws_index(req: HttpRequest) -> Result { +//! fn ws_index(req: HttpRequest) -> Result { //! ws::start(req, Ws) //! } //! @@ -52,13 +52,11 @@ use futures::{Async, Poll, Stream}; use actix::{Actor, AsyncContext, ResponseType, StreamHandler}; -use body::Body; -use context::HttpContext; -use handler::Reply; use payload::ReadAny; use error::{Error, WsHandshakeError}; +use context::HttpContext; use httprequest::HttpRequest; -use httpresponse::{ConnectionType, HttpResponse}; +use httpresponse::{ConnectionType, HttpResponse, HttpResponseBuilder}; use wsframe; use wsproto::*; @@ -88,17 +86,17 @@ impl ResponseType for Message { } /// Do websocket handshake and start actor -pub fn start(mut req: HttpRequest, actor: A) -> Result +pub fn start(mut req: HttpRequest, actor: A) -> Result where A: Actor> + StreamHandler, S: 'static { - let resp = handshake(&req)?; - + let mut resp = handshake(&req)?; let stream = WsStream::new(req.payload_mut().readany()); + let mut ctx = HttpContext::new(req, actor); - ctx.start(resp); ctx.add_stream(stream); - Ok(ctx.into()) + + Ok(resp.body(ctx)?) } /// Prepare `WebSocket` handshake response. @@ -109,7 +107,7 @@ pub fn start(mut req: HttpRequest, actor: A) -> Result // /// `protocols` is a sequence of known protocols. On successful handshake, // /// the returned response headers contain the first protocol in this list // /// which the server also knows. -pub fn handshake(req: &HttpRequest) -> Result { +pub fn handshake(req: &HttpRequest) -> Result { // WebSocket accepts only GET if *req.method() != Method::GET { return Err(WsHandshakeError::GetMethodRequired) @@ -163,8 +161,7 @@ pub fn handshake(req: &HttpRequest) -> Result