1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 09:42:40 +01:00

Merge remote-tracking branch 'upstream/master'

This commit is contained in:
ami44 2018-01-01 12:22:03 +01:00
commit fc88bb294a
44 changed files with 305 additions and 311 deletions

View File

@ -99,3 +99,17 @@ version_check = "0.1"
lto = true lto = true
opt-level = 3 opt-level = 3
# debug = true # debug = true
[workspace]
members = [
"./",
"examples/basic",
"examples/diesel",
"examples/json",
"examples/multipart",
"examples/signals",
"examples/state",
"examples/template_tera",
"examples/tls",
"examples/websocket-chat",
]

View File

@ -2,6 +2,7 @@
name = "basic" name = "basic"
version = "0.1.0" version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../.."
[dependencies] [dependencies]
futures = "*" futures = "*"

View File

@ -9,9 +9,10 @@ use futures::Stream;
use actix::*; use actix::*;
use actix_web::*; use actix_web::*;
#[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe};
use actix_web::middleware::RequestSession; use actix_web::middleware::RequestSession;
use futures::future::{FutureResult, result}; use futures::future::{FutureResult, result};
#[cfg(unix)]
use actix::actors::signal::{ProcessSignals, Subscribe};
/// simple handler /// simple handler
fn index(mut req: HttpRequest) -> Result<HttpResponse> { fn index(mut req: HttpRequest) -> Result<HttpResponse> {
@ -96,7 +97,9 @@ fn main() {
.bind("0.0.0.0:8080").unwrap() .bind("0.0.0.0:8080").unwrap()
.start(); .start();
if cfg!(target_os = "linux") { // Subscribe to unix signals // Subscribe to unix signals
#[cfg(unix)]
{
let signals = Arbiter::system_registry().get::<ProcessSignals>(); let signals = Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber())); signals.send(Subscribe(addr.subscriber()));
} }

View File

@ -2,6 +2,7 @@
name = "diesel-example" name = "diesel-example"
version = "0.1.0" version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../.."
[dependencies] [dependencies]
env_logger = "0.4" env_logger = "0.4"

View File

@ -45,7 +45,7 @@ fn index(req: HttpRequest<State>) -> Box<Future<Item=HttpResponse, Error=Error>>
.and_then(|res| { .and_then(|res| {
match res { match res {
Ok(user) => Ok(httpcodes::HTTPOk.build().json(user)?), Ok(user) => Ok(httpcodes::HTTPOk.build().json(user)?),
Err(_) => Ok(httpcodes::HTTPInternalServerError.response()) Err(_) => Ok(httpcodes::HTTPInternalServerError.into())
} }
}) })
.responder() .responder()

View File

@ -2,6 +2,7 @@
name = "json-example" name = "json-example"
version = "0.1.0" version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../.."
[dependencies] [dependencies]
bytes = "0.4" bytes = "0.4"

View File

@ -9,7 +9,8 @@ extern crate serde_json;
use actix::*; use actix::*;
use actix_web::*; use actix_web::*;
#[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe}; #[cfg(unix)]
use actix::actors::signal::{ProcessSignals, Subscribe};
use bytes::BytesMut; use bytes::BytesMut;
use futures::{Future, Stream}; use futures::{Future, Stream};
@ -96,7 +97,9 @@ fn main() {
.shutdown_timeout(1) .shutdown_timeout(1)
.start(); .start();
if cfg!(target_os = "linux") { // Subscribe to unix signals // Subscribe to unix signals
#[cfg(unix)]
{
let signals = Arbiter::system_registry().get::<ProcessSignals>(); let signals = Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber())); signals.send(Subscribe(addr.subscriber()));
} }

View File

@ -2,6 +2,7 @@
name = "multipart-example" name = "multipart-example"
version = "0.1.0" version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../.."
[[bin]] [[bin]]
name = "multipart" name = "multipart"

View File

@ -6,7 +6,8 @@ extern crate futures;
use actix::*; use actix::*;
use actix_web::*; use actix_web::*;
#[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe}; #[cfg(unix)]
use actix::actors::signal::{ProcessSignals, Subscribe};
use futures::{Future, Stream}; use futures::{Future, Stream};
use futures::future::{result, Either}; use futures::future::{result, Either};
@ -39,7 +40,7 @@ fn index(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>>
} }
}) })
.finish() // <- Stream::finish() combinator from actix .finish() // <- Stream::finish() combinator from actix
.map(|_| httpcodes::HTTPOk.response()) .map(|_| httpcodes::HTTPOk.into())
.responder() .responder()
} }
@ -55,7 +56,9 @@ fn main() {
.bind("127.0.0.1:8080").unwrap() .bind("127.0.0.1:8080").unwrap()
.start(); .start();
if cfg!(target_os = "linux") { // Subscribe to unix signals // Subscribe to unix signals
#[cfg(unix)]
{
let signals = Arbiter::system_registry().get::<ProcessSignals>(); let signals = Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber())); signals.send(Subscribe(addr.subscriber()));
} }

View File

@ -2,6 +2,7 @@
name = "signals" name = "signals"
version = "0.1.0" version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../.."
[[bin]] [[bin]]
name = "server" name = "server"

View File

@ -5,7 +5,8 @@ extern crate env_logger;
use actix::*; use actix::*;
use actix_web::*; use actix_web::*;
#[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe}; #[cfg(unix)]
use actix::actors::signal::{ProcessSignals, Subscribe};
struct MyWebSocket; struct MyWebSocket;
@ -34,7 +35,9 @@ fn main() {
.bind("127.0.0.1:8080").unwrap() .bind("127.0.0.1:8080").unwrap()
.start(); .start();
if cfg!(target_os = "linux") { // Subscribe to unix signals // Subscribe to unix signals
#[cfg(unix)]
{
let signals = Arbiter::system_registry().get::<ProcessSignals>(); let signals = Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber())); signals.send(Subscribe(addr.subscriber()));
} }

View File

@ -2,6 +2,7 @@
name = "state" name = "state"
version = "0.1.0" version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../.."
[dependencies] [dependencies]
futures = "*" futures = "*"

View File

@ -7,11 +7,14 @@ extern crate actix;
extern crate actix_web; extern crate actix_web;
extern crate env_logger; extern crate env_logger;
use actix::*;
use actix_web::*;
#[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe};
use std::cell::Cell; use std::cell::Cell;
use actix::*;
use actix_web::*;
#[cfg(unix)]
use actix::actors::signal::{ProcessSignals, Subscribe};
/// Application state
struct AppState { struct AppState {
counter: Cell<usize>, counter: Cell<usize>,
} }
@ -55,7 +58,6 @@ impl Handler<ws::Message> for MyWebSocket {
} }
} }
fn main() { fn main() {
::std::env::set_var("RUST_LOG", "actix_web=info"); ::std::env::set_var("RUST_LOG", "actix_web=info");
let _ = env_logger::init(); let _ = env_logger::init();
@ -74,7 +76,9 @@ fn main() {
.bind("127.0.0.1:8080").unwrap() .bind("127.0.0.1:8080").unwrap()
.start(); .start();
if cfg!(target_os = "linux") { // Subscribe to unix signals // Subscribe to unix signals
#[cfg(unix)]
{
let signals = Arbiter::system_registry().get::<ProcessSignals>(); let signals = Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber())); signals.send(Subscribe(addr.subscriber()));
} }

View File

@ -2,6 +2,7 @@
name = "template-tera" name = "template-tera"
version = "0.1.0" version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../.."
[dependencies] [dependencies]
env_logger = "0.4" env_logger = "0.4"

View File

@ -6,7 +6,9 @@ extern crate tera;
use actix::*; use actix::*;
use actix_web::*; use actix_web::*;
#[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe}; #[cfg(unix)]
use actix::actors::signal::{ProcessSignals, Subscribe};
struct State { struct State {
template: tera::Tera, // <- store tera template in application state template: tera::Tera, // <- store tera template in application state
@ -43,7 +45,8 @@ fn main() {
.bind("127.0.0.1:8080").unwrap() .bind("127.0.0.1:8080").unwrap()
.start(); .start();
if cfg!(target_os = "linux") { // Subscribe to unix signals #[cfg(unix)]
{ // Subscribe to unix signals
let signals = Arbiter::system_registry().get::<ProcessSignals>(); let signals = Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber())); signals.send(Subscribe(addr.subscriber()));
} }

View File

@ -2,6 +2,7 @@
name = "tls-example" name = "tls-example"
version = "0.1.0" version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../.."
[[bin]] [[bin]]
name = "server" name = "server"

View File

@ -8,7 +8,8 @@ use std::io::Read;
use actix::*; use actix::*;
use actix_web::*; use actix_web::*;
#[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe}; #[cfg(unix)]
use actix::actors::signal::{ProcessSignals, Subscribe};
/// somple handle /// somple handle
fn index(req: HttpRequest) -> Result<HttpResponse> { fn index(req: HttpRequest) -> Result<HttpResponse> {
@ -47,7 +48,9 @@ fn main() {
.bind("127.0.0.1:8443").unwrap() .bind("127.0.0.1:8443").unwrap()
.start_ssl(&pkcs12).unwrap(); .start_ssl(&pkcs12).unwrap();
if cfg!(target_os = "linux") { // Subscribe to unix signals // Subscribe to unix signals
#[cfg(unix)]
{
let signals = Arbiter::system_registry().get::<ProcessSignals>(); let signals = Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber())); signals.send(Subscribe(addr.subscriber()));
} }

View File

@ -2,6 +2,7 @@
name = "websocket-example" name = "websocket-example"
version = "0.1.0" version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../.."
[[bin]] [[bin]]
name = "server" name = "server"

View File

@ -17,7 +17,8 @@ use std::time::Instant;
use actix::*; use actix::*;
use actix_web::*; use actix_web::*;
#[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe}; #[cfg(unix)]
use actix::actors::signal::{ProcessSignals, Subscribe};
mod codec; mod codec;
mod server; mod server;
@ -30,7 +31,7 @@ struct WsChatSessionState {
} }
/// Entry point for our route /// Entry point for our route
fn chat_route(req: HttpRequest<WsChatSessionState>) -> Result<Reply> { fn chat_route(req: HttpRequest<WsChatSessionState>) -> Result<HttpResponse> {
ws::start( ws::start(
req, req,
WsChatSession { WsChatSession {
@ -215,7 +216,9 @@ fn main() {
.bind("127.0.0.1:8080").unwrap() .bind("127.0.0.1:8080").unwrap()
.start(); .start();
if cfg!(target_os = "linux") { // Subscribe to unix signals // Subscribe to unix signals
#[cfg(unix)]
{
let signals = Arbiter::system_registry().get::<ProcessSignals>(); let signals = Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber())); signals.send(Subscribe(addr.subscriber()));
} }

View File

@ -13,8 +13,8 @@ use actix_web::*;
#[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe}; #[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe};
/// do websocket handshake and start `MyWebSocket` actor /// do websocket handshake and start `MyWebSocket` actor
fn ws_index(r: HttpRequest) -> Reply { fn ws_index(r: HttpRequest) -> Result<HttpResponse, Error> {
ws::start(r, MyWebSocket).into() ws::start(r, MyWebSocket)
} }
/// websocket connection is long running connection, it easier /// websocket connection is long running connection, it easier

View File

@ -72,7 +72,7 @@ Create `Logger` middleware with the specified `format`.
Default `Logger` could be created with `default` method, it uses the default format: Default `Logger` could be created with `default` method, it uses the default format:
```ignore ```ignore
%a %t "%r" %s %b "%{Referrer}i" "%{User-Agent}i" %T %a %t "%r" %s %b "%{Referer}i" "%{User-Agent}i" %T
``` ```
```rust ```rust
# extern crate actix_web; # extern crate actix_web;

View File

@ -110,7 +110,7 @@ fn index(req: HttpRequest<State>) -> Box<Future<Item=HttpResponse, Error=Error>>
.and_then(|res| { .and_then(|res| {
match res { match res {
Ok(user) => Ok(httpcodes::HTTPOk.build().json(user)?), Ok(user) => Ok(httpcodes::HTTPOk.build().json(user)?),
Err(_) => Ok(httpcodes::HTTPInternalServerError.response()) Err(_) => Ok(httpcodes::HTTPInternalServerError.into())
} }
}) })
.responder() .responder()

View File

@ -65,7 +65,7 @@ impl<S> Handler<S> for MyHandler {
/// Handle request /// Handle request
fn handle(&mut self, req: HttpRequest<S>) -> Self::Result { fn handle(&mut self, req: HttpRequest<S>) -> Self::Result {
self.0 += 1; self.0 += 1;
httpcodes::HTTPOk.response() httpcodes::HTTPOk.into()
} }
} }
# fn main() {} # fn main() {}
@ -91,7 +91,7 @@ impl<S> Handler<S> for MyHandler {
fn handle(&mut self, req: HttpRequest<S>) -> Self::Result { fn handle(&mut self, req: HttpRequest<S>) -> Self::Result {
let num = self.0.load(Ordering::Relaxed) + 1; let num = self.0.load(Ordering::Relaxed) + 1;
self.0.store(num, Ordering::Relaxed); self.0.store(num, Ordering::Relaxed);
httpcodes::HTTPOk.response() httpcodes::HTTPOk.into()
} }
} }

View File

@ -246,7 +246,7 @@ fn index(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> {
.from_err() .from_err()
.and_then(|params| { // <- url encoded parameters .and_then(|params| { // <- url encoded parameters
println!("==== BODY ==== {:?}", params); println!("==== BODY ==== {:?}", params);
ok(httpcodes::HTTPOk.response()) ok(httpcodes::HTTPOk.into())
}) })
.responder() .responder()
} }

View File

@ -20,10 +20,10 @@ use actix_web::test::TestRequest;
fn index(req: HttpRequest) -> HttpResponse { fn index(req: HttpRequest) -> HttpResponse {
if let Some(hdr) = req.headers().get(header::CONTENT_TYPE) { if let Some(hdr) = req.headers().get(header::CONTENT_TYPE) {
if let Ok(s) = hdr.to_str() { if let Ok(s) = hdr.to_str() {
return httpcodes::HTTPOk.response() return httpcodes::HTTPOk.into()
} }
} }
httpcodes::HTTPBadRequest.response() httpcodes::HTTPBadRequest.into()
} }
fn main() { fn main() {
@ -60,7 +60,7 @@ use actix_web::*;
use actix_web::test::TestServer; use actix_web::test::TestServer;
fn index(req: HttpRequest) -> HttpResponse { fn index(req: HttpRequest) -> HttpResponse {
httpcodes::HTTPOk.response() httpcodes::HTTPOk.into()
} }
fn main() { fn main() {
@ -80,7 +80,7 @@ use actix_web::*;
use actix_web::test::TestServer; use actix_web::test::TestServer;
fn index(req: HttpRequest) -> HttpResponse { fn index(req: HttpRequest) -> HttpResponse {
httpcodes::HTTPOk.response() httpcodes::HTTPOk.into()
} }
/// This function get called by http server. /// This function get called by http server.

View File

@ -128,7 +128,7 @@ impl<S> Application<S> where S: 'static {
} }
} }
/// Set application prefix. /// Set application prefix
/// ///
/// Only requests that matches application's prefix get processed by this application. /// Only requests that matches application's prefix get processed by this application.
/// Application prefix always contains laading "/" slash. If supplied prefix /// Application prefix always contains laading "/" slash. If supplied prefix

View File

@ -5,6 +5,7 @@ use bytes::{Bytes, BytesMut};
use futures::Stream; use futures::Stream;
use error::Error; use error::Error;
use context::ActorHttpContext;
/// Type represent streaming body /// Type represent streaming body
pub type BodyStream = Box<Stream<Item=Bytes, Error=Error>>; pub type BodyStream = Box<Stream<Item=Bytes, Error=Error>>;
@ -18,12 +19,8 @@ pub enum Body {
/// Unspecified streaming response. Developer is responsible for setting /// Unspecified streaming response. Developer is responsible for setting
/// right `Content-Length` or `Transfer-Encoding` headers. /// right `Content-Length` or `Transfer-Encoding` headers.
Streaming(BodyStream), Streaming(BodyStream),
/// Upgrade connection. /// Special body type for actor response.
Upgrade(BodyStream), Actor(Box<ActorHttpContext>),
/// Special body type for actor streaming response.
StreamingContext,
/// Special body type for actor upgrade response.
UpgradeContext,
} }
/// Represents various types of binary body. /// Represents various types of binary body.
@ -51,8 +48,7 @@ impl Body {
#[inline] #[inline]
pub fn is_streaming(&self) -> bool { pub fn is_streaming(&self) -> bool {
match *self { match *self {
Body::Streaming(_) | Body::StreamingContext Body::Streaming(_) | Body::Actor(_) => true,
| Body::Upgrade(_) | Body::UpgradeContext => true,
_ => false _ => false
} }
} }
@ -83,15 +79,7 @@ impl PartialEq for Body {
Body::Binary(ref b2) => b == b2, Body::Binary(ref b2) => b == b2,
_ => false, _ => false,
}, },
Body::StreamingContext => match *other { Body::Streaming(_) | Body::Actor(_) => false,
Body::StreamingContext => true,
_ => false,
},
Body::UpgradeContext => match *other {
Body::UpgradeContext => true,
_ => false,
},
Body::Streaming(_) | Body::Upgrade(_) => false,
} }
} }
} }
@ -102,9 +90,7 @@ impl fmt::Debug for Body {
Body::Empty => write!(f, "Body::Empty"), Body::Empty => write!(f, "Body::Empty"),
Body::Binary(ref b) => write!(f, "Body::Binary({:?})", b), Body::Binary(ref b) => write!(f, "Body::Binary({:?})", b),
Body::Streaming(_) => write!(f, "Body::Streaming(_)"), Body::Streaming(_) => write!(f, "Body::Streaming(_)"),
Body::Upgrade(_) => write!(f, "Body::Upgrade(_)"), Body::Actor(_) => write!(f, "Body::Actor(_)"),
Body::StreamingContext => write!(f, "Body::StreamingContext"),
Body::UpgradeContext => write!(f, "Body::UpgradeContext"),
} }
} }
} }
@ -115,6 +101,12 @@ impl<T> From<T> for Body where T: Into<Binary>{
} }
} }
impl From<Box<ActorHttpContext>> for Body {
fn from(ctx: Box<ActorHttpContext>) -> Body {
Body::Actor(ctx)
}
}
impl Binary { impl Binary {
#[inline] #[inline]
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {

View File

@ -1,6 +1,7 @@
use std; use std;
use std::marker::PhantomData;
use std::collections::VecDeque; use std::collections::VecDeque;
use futures::{Async, Poll}; use futures::{Async, Future, Poll};
use futures::sync::oneshot::Sender; use futures::sync::oneshot::Sender;
use futures::unsync::oneshot; use futures::unsync::oneshot;
@ -11,19 +12,18 @@ use actix::dev::{AsyncContextApi, ActorAddressCell, ActorItemsCell, ActorWaitCel
Envelope, ToEnvelope, RemoteEnvelope}; Envelope, ToEnvelope, RemoteEnvelope};
use body::{Body, Binary}; use body::{Body, Binary};
use error::Error; use error::{Error, Result};
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
pub(crate) trait IoContext: 'static { pub trait ActorHttpContext: 'static {
fn disconnected(&mut self); fn disconnected(&mut self);
fn poll(&mut self) -> Poll<Option<Frame>, Error>; fn poll(&mut self) -> Poll<Option<Frame>, Error>;
} }
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum Frame { pub enum Frame {
Message(HttpResponse),
Payload(Option<Binary>), Payload(Option<Binary>),
Drain(oneshot::Sender<()>), Drain(oneshot::Sender<()>),
} }
@ -31,7 +31,7 @@ pub(crate) enum Frame {
/// Http actor execution context /// Http actor execution context
pub struct HttpContext<A, S=()> where A: Actor<Context=HttpContext<A, S>>, pub struct HttpContext<A, S=()> where A: Actor<Context=HttpContext<A, S>>,
{ {
act: A, act: Option<A>,
state: ActorState, state: ActorState,
modified: bool, modified: bool,
items: ActorItemsCell<A>, items: ActorItemsCell<A>,
@ -39,7 +39,6 @@ pub struct HttpContext<A, S=()> where A: Actor<Context=HttpContext<A, S>>,
stream: VecDeque<Frame>, stream: VecDeque<Frame>,
wait: ActorWaitCell<A>, wait: ActorWaitCell<A>,
request: HttpRequest<S>, request: HttpRequest<S>,
streaming: bool,
disconnected: bool, disconnected: bool,
} }
@ -101,12 +100,15 @@ impl<A, S> AsyncContextApi<A> for HttpContext<A, S> where A: Actor<Context=Self>
} }
} }
impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> { impl<A, S: 'static> HttpContext<A, S> where A: Actor<Context=Self> {
pub fn new(req: HttpRequest<S>, actor: A) -> HttpContext<A, S> pub fn new(req: HttpRequest<S>, actor: A) -> HttpContext<A, S> {
{ HttpContext::from_request(req).actor(actor)
}
pub fn from_request(req: HttpRequest<S>) -> HttpContext<A, S> {
HttpContext { HttpContext {
act: actor, act: None,
state: ActorState::Started, state: ActorState::Started,
modified: false, modified: false,
items: ActorItemsCell::default(), items: ActorItemsCell::default(),
@ -114,10 +116,24 @@ impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
wait: ActorWaitCell::default(), wait: ActorWaitCell::default(),
stream: VecDeque::new(), stream: VecDeque::new(),
request: req, request: req,
streaming: false,
disconnected: false, disconnected: false,
} }
} }
pub fn actor(mut self, actor: A) -> HttpContext<A, S> {
self.act = Some(actor);
self
}
pub fn with_actor(mut self, actor: A, mut resp: HttpResponse) -> Result<HttpResponse> {
if self.act.is_some() {
panic!("Actor is set already");
}
self.act = Some(actor);
resp.replace_body(Body::Actor(Box::new(self)));
Ok(resp)
}
} }
impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> { impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
@ -132,24 +148,12 @@ impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
&mut self.request &mut self.request
} }
/// Send response to peer
pub fn start<R: Into<HttpResponse>>(&mut self, response: R) {
let resp = response.into();
match *resp.body() {
Body::StreamingContext | Body::UpgradeContext => self.streaming = true,
_ => (),
}
self.stream.push_back(Frame::Message(resp))
}
/// Write payload /// Write payload
pub fn write<B: Into<Binary>>(&mut self, data: B) { pub fn write<B: Into<Binary>>(&mut self, data: B) {
if self.streaming {
if !self.disconnected { if !self.disconnected {
self.stream.push_back(Frame::Payload(Some(data.into()))) self.stream.push_back(Frame::Payload(Some(data.into())));
}
} else { } else {
warn!("Trying to write response body for non-streaming response"); warn!("Trying to write to disconnected response");
} }
} }
@ -159,11 +163,11 @@ impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
} }
/// Returns drain future /// Returns drain future
pub fn drain(&mut self) -> oneshot::Receiver<()> { pub fn drain(&mut self) -> Drain<A> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.modified = true; self.modified = true;
self.stream.push_back(Frame::Drain(tx)); self.stream.push_back(Frame::Drain(tx));
rx Drain::new(rx)
} }
/// Check if connection still open /// Check if connection still open
@ -193,7 +197,7 @@ impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
} }
} }
impl<A, S> IoContext for HttpContext<A, S> where A: Actor<Context=Self>, S: 'static { impl<A, S> ActorHttpContext for HttpContext<A, S> where A: Actor<Context=Self>, S: 'static {
fn disconnected(&mut self) { fn disconnected(&mut self) {
self.items.stop(); self.items.stop();
@ -204,8 +208,11 @@ impl<A, S> IoContext for HttpContext<A, S> where A: Actor<Context=Self>, S: 'sta
} }
fn poll(&mut self) -> Poll<Option<Frame>, Error> { fn poll(&mut self) -> Poll<Option<Frame>, Error> {
if self.act.is_none() {
return Ok(Async::Ready(None))
}
let act: &mut A = unsafe { let act: &mut A = unsafe {
std::mem::transmute(&mut self.act as &mut A) std::mem::transmute(self.act.as_mut().unwrap() as &mut A)
}; };
let ctx: &mut HttpContext<A, S> = unsafe { let ctx: &mut HttpContext<A, S> = unsafe {
std::mem::transmute(self as &mut HttpContext<A, S>) std::mem::transmute(self as &mut HttpContext<A, S>)
@ -303,3 +310,39 @@ impl<A, S> ToEnvelope<A> for HttpContext<A, S>
RemoteEnvelope::new(msg, tx).into() RemoteEnvelope::new(msg, tx).into()
} }
} }
impl<A, S> From<HttpContext<A, S>> for Body
where A: Actor<Context=HttpContext<A, S>>,
S: 'static
{
fn from(ctx: HttpContext<A, S>) -> Body {
Body::Actor(Box::new(ctx))
}
}
pub struct Drain<A> {
fut: oneshot::Receiver<()>,
_a: PhantomData<A>,
}
impl<A> Drain<A> {
fn new(fut: oneshot::Receiver<()>) -> Self {
Drain {
fut: fut,
_a: PhantomData
}
}
}
impl<A: Actor> ActorFuture for Drain<A> {
type Item = ();
type Error = ();
type Actor = A;
fn poll(&mut self,
_: &mut A,
_: &mut <Self::Actor as Actor>::Context) -> Poll<Self::Item, Self::Error>
{
self.fut.poll().map_err(|_| ())
}
}

View File

@ -416,8 +416,20 @@ impl PayloadEncoder {
resp.headers_mut().remove(CONTENT_LENGTH); resp.headers_mut().remove(CONTENT_LENGTH);
TransferEncoding::eof(buf) TransferEncoding::eof(buf)
} }
Body::Streaming(_) | Body::StreamingContext => { Body::Streaming(_) | Body::Actor(_) => {
if resp.chunked() { if resp.upgrade() {
if version == Version::HTTP_2 {
error!("Connection upgrade is forbidden for HTTP/2");
} else {
resp.headers_mut().insert(
CONNECTION, HeaderValue::from_static("upgrade"));
}
if encoding != ContentEncoding::Identity {
encoding = ContentEncoding::Identity;
resp.headers_mut().remove(CONTENT_ENCODING);
}
TransferEncoding::eof(buf)
} else if resp.chunked() {
resp.headers_mut().remove(CONTENT_LENGTH); resp.headers_mut().remove(CONTENT_LENGTH);
if version != Version::HTTP_11 { if version != Version::HTTP_11 {
error!("Chunked transfer encoding is forbidden for {:?}", version); error!("Chunked transfer encoding is forbidden for {:?}", version);
@ -446,19 +458,6 @@ impl PayloadEncoder {
TransferEncoding::eof(buf) TransferEncoding::eof(buf)
} }
} }
Body::Upgrade(_) | Body::UpgradeContext => {
if version == Version::HTTP_2 {
error!("Connection upgrade is forbidden for HTTP/2");
} else {
resp.headers_mut().insert(
CONNECTION, HeaderValue::from_static("upgrade"));
}
if encoding != ContentEncoding::Identity {
encoding = ContentEncoding::Identity;
resp.headers_mut().remove(CONTENT_ENCODING);
}
TransferEncoding::eof(buf)
}
}; };
resp.replace_body(body); resp.replace_body(body);

View File

@ -114,14 +114,6 @@ impl ResponseError for header::InvalidHeaderValue {}
/// `InternalServerError` for `futures::Canceled` /// `InternalServerError` for `futures::Canceled`
impl ResponseError for Canceled {} impl ResponseError for Canceled {}
/// Internal error
#[doc(hidden)]
#[derive(Fail, Debug)]
#[fail(display="Unexpected task frame")]
pub struct UnexpectedTaskFrame;
impl ResponseError for UnexpectedTaskFrame {}
/// A set of errors that can occur during parsing HTTP streams /// A set of errors that can occur during parsing HTTP streams
#[derive(Fail, Debug)] #[derive(Fail, Debug)]
pub enum ParseError { pub enum ParseError {

View File

@ -192,7 +192,6 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
if let Body::Binary(bytes) = body { if let Body::Binary(bytes) = body {
self.encoder.write(bytes.as_ref())?; self.encoder.write(bytes.as_ref())?;
return Ok(WriterState::Done)
} else { } else {
msg.replace_body(body); msg.replace_body(body);
} }

View File

@ -92,7 +92,7 @@ impl H2Writer {
let cap = cmp::min(buffer.len(), CHUNK_SIZE); let cap = cmp::min(buffer.len(), CHUNK_SIZE);
stream.reserve_capacity(cap); stream.reserve_capacity(cap);
} else { } else {
return Ok(WriterState::Done) return Ok(WriterState::Pause)
} }
} }
Err(_) => { Err(_) => {
@ -130,9 +130,23 @@ impl Writer for H2Writer {
// using helpers::date is quite a lot faster // using helpers::date is quite a lot faster
if !msg.headers().contains_key(DATE) { if !msg.headers().contains_key(DATE) {
let mut bytes = BytesMut::with_capacity(29); let mut bytes = BytesMut::with_capacity(29);
helpers::date(&mut bytes); helpers::date_value(&mut bytes);
msg.headers_mut().insert(DATE, HeaderValue::try_from(bytes.freeze()).unwrap());
}
let body = msg.replace_body(Body::Empty);
match body {
Body::Binary(ref bytes) => {
let mut val = BytesMut::new();
helpers::convert_usize(bytes.len(), &mut val);
let l = val.len();
msg.headers_mut().insert( msg.headers_mut().insert(
DATE, HeaderValue::try_from(bytes.freeze()).unwrap()); CONTENT_LENGTH, HeaderValue::try_from(val.split_to(l-2).freeze()).unwrap());
}
Body::Empty => {
msg.headers_mut().insert(CONTENT_LENGTH, HeaderValue::from_static("0"));
},
_ => (),
} }
let mut resp = Response::new(()); let mut resp = Response::new(());
@ -142,19 +156,6 @@ impl Writer for H2Writer {
resp.headers_mut().insert(key, value.clone()); resp.headers_mut().insert(key, value.clone());
} }
match *msg.body() {
Body::Binary(ref bytes) => {
let mut val = BytesMut::new();
helpers::convert_usize(bytes.len(), &mut val);
resp.headers_mut().insert(
CONTENT_LENGTH, HeaderValue::try_from(val.freeze()).unwrap());
}
Body::Empty => {
resp.headers_mut().insert(CONTENT_LENGTH, HeaderValue::from_static("0"));
},
_ => (),
}
match self.respond.send_response(resp, self.flags.contains(Flags::EOF)) { match self.respond.send_response(resp, self.flags.contains(Flags::EOF)) {
Ok(stream) => Ok(stream) =>
self.stream = Some(stream), self.stream = Some(stream),
@ -162,21 +163,20 @@ impl Writer for H2Writer {
return Err(io::Error::new(io::ErrorKind::Other, "err")), return Err(io::Error::new(io::ErrorKind::Other, "err")),
} }
// trace!("Response: {:?}", msg); trace!("Response: {:?}", msg);
if msg.body().is_binary() { if let Body::Binary(bytes) = body {
if let Body::Binary(bytes) = msg.replace_body(Body::Empty) {
self.flags.insert(Flags::EOF); self.flags.insert(Flags::EOF);
self.encoder.write(bytes.as_ref())?; self.encoder.write(bytes.as_ref())?;
if let Some(ref mut stream) = self.stream { if let Some(ref mut stream) = self.stream {
stream.reserve_capacity(cmp::min(self.encoder.len(), CHUNK_SIZE)); stream.reserve_capacity(cmp::min(self.encoder.len(), CHUNK_SIZE));
} }
return Ok(WriterState::Done) Ok(WriterState::Pause)
} } else {
} msg.replace_body(body);
Ok(WriterState::Done) Ok(WriterState::Done)
} }
}
fn write(&mut self, payload: &[u8]) -> Result<WriterState, io::Error> { fn write(&mut self, payload: &[u8]) -> Result<WriterState, io::Error> {
if !self.flags.contains(Flags::DISCONNECTED) { if !self.flags.contains(Flags::DISCONNECTED) {

View File

@ -1,13 +1,11 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use actix::Actor;
use futures::future::{Future, ok, err};
use regex::Regex; use regex::Regex;
use futures::future::{Future, ok, err};
use http::{header, StatusCode, Error as HttpError}; use http::{header, StatusCode, Error as HttpError};
use body::Body; use body::Body;
use error::Error; use error::Error;
use context::{HttpContext, IoContext};
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
@ -69,20 +67,11 @@ pub struct Reply(ReplyItem);
pub(crate) enum ReplyItem { pub(crate) enum ReplyItem {
Message(HttpResponse), Message(HttpResponse),
Actor(Box<IoContext>),
Future(Box<Future<Item=HttpResponse, Error=Error>>), Future(Box<Future<Item=HttpResponse, Error=Error>>),
} }
impl Reply { impl Reply {
/// Create actor response
#[inline]
pub fn actor<A, S>(ctx: HttpContext<A, S>) -> Reply
where A: Actor<Context=HttpContext<A, S>>, S: 'static
{
Reply(ReplyItem::Actor(Box::new(ctx)))
}
/// Create async response /// Create async response
#[inline] #[inline]
pub fn async<F>(fut: F) -> Reply pub fn async<F>(fut: F) -> Reply
@ -163,25 +152,6 @@ impl<E: Into<Error>> From<Result<Reply, E>> for Reply {
} }
} }
impl<A: Actor<Context=HttpContext<A, S>>, S: 'static> Responder for HttpContext<A, S>
{
type Item = Reply;
type Error = Error;
#[inline]
fn respond_to(self, _: HttpRequest) -> Result<Reply, Error> {
Ok(Reply(ReplyItem::Actor(Box::new(self))))
}
}
impl<A: Actor<Context=HttpContext<A, S>>, S: 'static> From<HttpContext<A, S>> for Reply {
#[inline]
fn from(ctx: HttpContext<A, S>) -> Reply {
Reply(ReplyItem::Actor(Box::new(ctx)))
}
}
impl<I, E> Responder for Box<Future<Item=I, Error=E>> impl<I, E> Responder for Box<Future<Item=I, Error=E>>
where I: Responder + 'static, where I: Responder + 'static,
E: Into<Error> + 'static E: Into<Error> + 'static

View File

@ -11,9 +11,9 @@ use http::Version;
use httprequest::HttpMessage; use httprequest::HttpMessage;
// "Sun, 06 Nov 1994 08:49:37 GMT".len() // "Sun, 06 Nov 1994 08:49:37 GMT".len()
pub const DATE_VALUE_LENGTH: usize = 29; pub(crate) const DATE_VALUE_LENGTH: usize = 29;
pub fn date(dst: &mut BytesMut) { pub(crate) fn date(dst: &mut BytesMut) {
CACHED.with(|cache| { CACHED.with(|cache| {
let mut buf: [u8; 39] = unsafe { mem::uninitialized() }; let mut buf: [u8; 39] = unsafe { mem::uninitialized() };
buf[..6].copy_from_slice(b"date: "); buf[..6].copy_from_slice(b"date: ");
@ -23,7 +23,13 @@ pub fn date(dst: &mut BytesMut) {
}) })
} }
pub fn update_date() { pub(crate) fn date_value(dst: &mut BytesMut) {
CACHED.with(|cache| {
dst.extend_from_slice(cache.borrow().buffer());
})
}
pub(crate) fn update_date() {
CACHED.with(|cache| { CACHED.with(|cache| {
cache.borrow_mut().update(); cache.borrow_mut().update();
}); });

View File

@ -54,9 +54,6 @@ impl StaticResponse {
pub fn build(&self) -> HttpResponseBuilder { pub fn build(&self) -> HttpResponseBuilder {
HttpResponse::build(self.0) HttpResponse::build(self.0)
} }
pub fn response(&self) -> HttpResponse {
HttpResponse::new(self.0, Body::Empty)
}
pub fn with_reason(self, reason: &'static str) -> HttpResponse { pub fn with_reason(self, reason: &'static str) -> HttpResponse {
let mut resp = HttpResponse::new(self.0, Body::Empty); let mut resp = HttpResponse::new(self.0, Body::Empty);
resp.set_reason(reason); resp.set_reason(reason);
@ -92,7 +89,7 @@ impl Responder for StaticResponse {
impl From<StaticResponse> for HttpResponse { impl From<StaticResponse> for HttpResponse {
fn from(st: StaticResponse) -> Self { fn from(st: StaticResponse) -> Self {
st.response() HttpResponse::new(st.0, Body::Empty)
} }
} }
@ -153,7 +150,7 @@ mod tests {
#[test] #[test]
fn test_response() { fn test_response() {
let resp = HTTPOk.response(); let resp: HttpResponse = HTTPOk.into();
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
} }
@ -165,7 +162,7 @@ mod tests {
#[test] #[test]
fn test_with_reason() { fn test_with_reason() {
let resp = HTTPOk.response(); let resp: HttpResponse = HTTPOk.into();
assert_eq!(resp.reason(), "OK"); assert_eq!(resp.reason(), "OK");
let resp = HTTPBadRequest.with_reason("test"); let resp = HTTPBadRequest.with_reason("test");

View File

@ -135,6 +135,12 @@ impl HttpRequest<()> {
impl<S> HttpRequest<S> { impl<S> HttpRequest<S> {
#[inline]
/// Construct new http request with state.
pub fn change_state<NS>(self, state: Rc<NS>) -> HttpRequest<NS> {
HttpRequest(self.0, Some(state), self.2.clone())
}
#[inline] #[inline]
/// Construct new http request without state. /// Construct new http request without state.
pub(crate) fn clone_without_state(&self) -> HttpRequest { pub(crate) fn clone_without_state(&self) -> HttpRequest {
@ -447,7 +453,7 @@ impl<S> HttpRequest<S> {
/// } /// }
/// }) /// })
/// .finish() // <- Stream::finish() combinator from actix /// .finish() // <- Stream::finish() combinator from actix
/// .map(|_| httpcodes::HTTPOk.response()) /// .map(|_| httpcodes::HTTPOk.into())
/// .responder() /// .responder()
/// } /// }
/// # fn main() {} /// # fn main() {}
@ -477,7 +483,7 @@ impl<S> HttpRequest<S> {
/// .from_err() /// .from_err()
/// .and_then(|params| { // <- url encoded parameters /// .and_then(|params| { // <- url encoded parameters
/// println!("==== BODY ==== {:?}", params); /// println!("==== BODY ==== {:?}", params);
/// ok(httpcodes::HTTPOk.response()) /// ok(httpcodes::HTTPOk.into())
/// }) /// })
/// .responder() /// .responder()
/// } /// }
@ -512,7 +518,7 @@ impl<S> HttpRequest<S> {
/// .from_err() /// .from_err()
/// .and_then(|val: MyObj| { // <- deserialized value /// .and_then(|val: MyObj| { // <- deserialized value
/// println!("==== BODY ==== {:?}", val); /// println!("==== BODY ==== {:?}", val);
/// Ok(httpcodes::HTTPOk.response()) /// Ok(httpcodes::HTTPOk.into())
/// }).responder() /// }).responder()
/// } /// }
/// # fn main() {} /// # fn main() {}

View File

@ -138,6 +138,7 @@ impl HttpResponse {
} }
/// Connection upgrade status /// Connection upgrade status
#[inline]
pub fn upgrade(&self) -> bool { pub fn upgrade(&self) -> bool {
self.get_ref().connection_type == Some(ConnectionType::Upgrade) self.get_ref().connection_type == Some(ConnectionType::Upgrade)
} }
@ -155,11 +156,13 @@ impl HttpResponse {
} }
/// is chunked encoding enabled /// is chunked encoding enabled
#[inline]
pub fn chunked(&self) -> bool { pub fn chunked(&self) -> bool {
self.get_ref().chunked self.get_ref().chunked
} }
/// Content encoding /// Content encoding
#[inline]
pub fn content_encoding(&self) -> &ContentEncoding { pub fn content_encoding(&self) -> &ContentEncoding {
&self.get_ref().encoding &self.get_ref().encoding
} }
@ -171,6 +174,7 @@ impl HttpResponse {
} }
/// Get body os this response /// Get body os this response
#[inline]
pub fn body(&self) -> &Body { pub fn body(&self) -> &Body {
&self.get_ref().body &self.get_ref().body
} }
@ -443,6 +447,15 @@ impl HttpResponseBuilder {
pub fn finish(&mut self) -> Result<HttpResponse, HttpError> { pub fn finish(&mut self) -> Result<HttpResponse, HttpError> {
self.body(Body::Empty) self.body(Body::Empty)
} }
/// This method construct new `HttpResponseBuilder`
pub fn take(&mut self) -> HttpResponseBuilder {
HttpResponseBuilder {
response: self.response.take(),
err: self.err.take(),
cookies: self.cookies.take(),
}
}
} }
#[inline] #[inline]

View File

@ -71,7 +71,7 @@ impl<T: Serialize> Responder for Json<T> {
/// .from_err() /// .from_err()
/// .and_then(|val: MyObj| { // <- deserialized value /// .and_then(|val: MyObj| { // <- deserialized value
/// println!("==== BODY ==== {:?}", val); /// println!("==== BODY ==== {:?}", val);
/// Ok(httpcodes::HTTPOk.response()) /// Ok(httpcodes::HTTPOk.into())
/// }).responder() /// }).responder()
/// } /// }
/// # fn main() {} /// # fn main() {}

View File

@ -19,7 +19,7 @@ use middleware::{Middleware, Started, Finished};
/// Default `Logger` could be created with `default` method, it uses the default format: /// Default `Logger` could be created with `default` method, it uses the default format:
/// ///
/// ```ignore /// ```ignore
/// %a %t "%r" %s %b "%{Referrer}i" "%{User-Agent}i" %T /// %a %t "%r" %s %b "%{Referer}i" "%{User-Agent}i" %T
/// ``` /// ```
/// ```rust /// ```rust
/// # extern crate actix_web; /// # extern crate actix_web;
@ -75,7 +75,7 @@ impl Default for Logger {
/// Create `Logger` middleware with format: /// Create `Logger` middleware with format:
/// ///
/// ```ignore /// ```ignore
/// %a %t "%r" %s %b "%{Referrer}i" "%{User-Agent}i" %T /// %a %t "%r" %s %b "%{Referer}i" "%{User-Agent}i" %T
/// ``` /// ```
fn default() -> Logger { fn default() -> Logger {
Logger { format: Format::default() } Logger { format: Format::default() }
@ -121,7 +121,7 @@ struct Format(Vec<FormatText>);
impl Default for Format { impl Default for Format {
/// Return the default formatting style for the `Logger`: /// Return the default formatting style for the `Logger`:
fn default() -> Format { fn default() -> Format {
Format::new(r#"%a %t "%r" %s %b "%{Referrer}i" "%{User-Agent}i" %T"#) Format::new(r#"%a %t "%r" %s %b "%{Referer}i" "%{User-Agent}i" %T"#)
} }
} }

View File

@ -9,20 +9,14 @@ use futures::{Future, Async, Poll, Stream};
use futures::task::{Task, current as current_task}; use futures::task::{Task, current as current_task};
use body::BodyStream; use body::BodyStream;
use actix::ResponseType;
use error::PayloadError; use error::PayloadError;
pub(crate) const DEFAULT_BUFFER_SIZE: usize = 65_536; // max buffer size 64k pub(crate) const DEFAULT_BUFFER_SIZE: usize = 65_536; // max buffer size 64k
/// Just Bytes object /// Just Bytes object
#[derive(PartialEq)] #[derive(PartialEq, Message)]
pub struct PayloadItem(pub Bytes); pub struct PayloadItem(pub Bytes);
impl ResponseType for PayloadItem {
type Item = ();
type Error = ();
}
impl Deref for PayloadItem { impl Deref for PayloadItem {
type Target = Bytes; type Target = Bytes;
@ -91,27 +85,27 @@ impl Payload {
} }
/// Get first available chunk of data. /// Get first available chunk of data.
pub fn readany(&mut self) -> ReadAny { pub fn readany(&self) -> ReadAny {
ReadAny(Rc::clone(&self.inner)) ReadAny(Rc::clone(&self.inner))
} }
/// Get exact number of bytes /// Get exact number of bytes
pub fn readexactly(&mut self, size: usize) -> ReadExactly { pub fn readexactly(&self, size: usize) -> ReadExactly {
ReadExactly(Rc::clone(&self.inner), size) ReadExactly(Rc::clone(&self.inner), size)
} }
/// Read until `\n` /// Read until `\n`
pub fn readline(&mut self) -> ReadLine { pub fn readline(&self) -> ReadLine {
ReadLine(Rc::clone(&self.inner)) ReadLine(Rc::clone(&self.inner))
} }
/// Read until match line /// Read until match line
pub fn readuntil(&mut self, line: &[u8]) -> ReadUntil { pub fn readuntil(&self, line: &[u8]) -> ReadUntil {
ReadUntil(Rc::clone(&self.inner), line.to_vec()) ReadUntil(Rc::clone(&self.inner), line.to_vec())
} }
#[doc(hidden)] #[doc(hidden)]
pub fn readall(&mut self) -> Option<Bytes> { pub fn readall(&self) -> Option<Bytes> {
self.inner.borrow_mut().readall() self.inner.borrow_mut().readall()
} }
@ -132,20 +126,16 @@ impl Payload {
/// Convert payload into compatible `HttpResponse` body stream /// Convert payload into compatible `HttpResponse` body stream
pub fn stream(self) -> BodyStream { pub fn stream(self) -> BodyStream {
Box::new(self.map_err(|e| e.into())) Box::new(self.map(|i| i.0).map_err(|e| e.into()))
} }
} }
impl Stream for Payload { impl Stream for Payload {
type Item = Bytes; type Item = PayloadItem;
type Error = PayloadError; type Error = PayloadError;
fn poll(&mut self) -> Poll<Option<Bytes>, PayloadError> { fn poll(&mut self) -> Poll<Option<PayloadItem>, PayloadError> {
match self.inner.borrow_mut().readany()? { self.inner.borrow_mut().readany()
Async::Ready(Some(item)) => Ok(Async::Ready(Some(item.0))),
Async::Ready(None) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
} }
} }
@ -474,7 +464,7 @@ mod tests {
#[test] #[test]
fn test_basic() { fn test_basic() {
Core::new().unwrap().run(lazy(|| { Core::new().unwrap().run(lazy(|| {
let (_, mut payload) = Payload::new(false); let (_, payload) = Payload::new(false);
assert!(!payload.eof()); assert!(!payload.eof());
assert!(payload.is_empty()); assert!(payload.is_empty());
@ -489,7 +479,7 @@ mod tests {
#[test] #[test]
fn test_eof() { fn test_eof() {
Core::new().unwrap().run(lazy(|| { Core::new().unwrap().run(lazy(|| {
let (mut sender, mut payload) = Payload::new(false); let (mut sender, payload) = Payload::new(false);
assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap()); assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap());
assert!(!payload.eof()); assert!(!payload.eof());
@ -514,7 +504,7 @@ mod tests {
#[test] #[test]
fn test_err() { fn test_err() {
Core::new().unwrap().run(lazy(|| { Core::new().unwrap().run(lazy(|| {
let (mut sender, mut payload) = Payload::new(false); let (mut sender, payload) = Payload::new(false);
assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap()); assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap());
@ -528,7 +518,7 @@ mod tests {
#[test] #[test]
fn test_readany() { fn test_readany() {
Core::new().unwrap().run(lazy(|| { Core::new().unwrap().run(lazy(|| {
let (mut sender, mut payload) = Payload::new(false); let (mut sender, payload) = Payload::new(false);
sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line1"));
@ -552,7 +542,7 @@ mod tests {
#[test] #[test]
fn test_readexactly() { fn test_readexactly() {
Core::new().unwrap().run(lazy(|| { Core::new().unwrap().run(lazy(|| {
let (mut sender, mut payload) = Payload::new(false); let (mut sender, payload) = Payload::new(false);
assert_eq!(Async::NotReady, payload.readexactly(2).poll().ok().unwrap()); assert_eq!(Async::NotReady, payload.readexactly(2).poll().ok().unwrap());
@ -579,7 +569,7 @@ mod tests {
#[test] #[test]
fn test_readuntil() { fn test_readuntil() {
Core::new().unwrap().run(lazy(|| { Core::new().unwrap().run(lazy(|| {
let (mut sender, mut payload) = Payload::new(false); let (mut sender, payload) = Payload::new(false);
assert_eq!(Async::NotReady, payload.readuntil(b"ne").poll().ok().unwrap()); assert_eq!(Async::NotReady, payload.readuntil(b"ne").poll().ok().unwrap());

View File

@ -8,8 +8,8 @@ use futures::unsync::oneshot;
use channel::HttpHandlerTask; use channel::HttpHandlerTask;
use body::{Body, BodyStream}; use body::{Body, BodyStream};
use context::{Frame, IoContext}; use context::{Frame, ActorHttpContext};
use error::{Error, UnexpectedTaskFrame}; use error::Error;
use handler::{Reply, ReplyItem}; use handler::{Reply, ReplyItem};
use h1writer::{Writer, WriterState}; use h1writer::{Writer, WriterState};
use httprequest::HttpRequest; use httprequest::HttpRequest;
@ -38,7 +38,7 @@ struct PipelineInfo<S> {
req: HttpRequest<S>, req: HttpRequest<S>,
count: usize, count: usize,
mws: Rc<Vec<Box<Middleware<S>>>>, mws: Rc<Vec<Box<Middleware<S>>>>,
context: Option<Box<IoContext>>, context: Option<Box<ActorHttpContext>>,
error: Option<Error>, error: Option<Error>,
} }
@ -72,12 +72,6 @@ impl<S> PipelineInfo<S> {
} }
} }
enum PipelineResponse {
None,
Context(Box<IoContext>),
Response(Box<Future<Item=HttpResponse, Error=Error>>),
}
impl<S, H: PipelineHandler<S>> Pipeline<S, H> { impl<S, H: PipelineHandler<S>> Pipeline<S, H> {
pub fn new(req: HttpRequest<S>, pub fn new(req: HttpRequest<S>,
@ -364,7 +358,7 @@ impl<S, H: PipelineHandler<S>> StartMiddlewares<S, H> {
// waiting for response // waiting for response
struct WaitingResponse<S, H> { struct WaitingResponse<S, H> {
stream: PipelineResponse, fut: Box<Future<Item=HttpResponse, Error=Error>>,
_s: PhantomData<S>, _s: PhantomData<S>,
_h: PhantomData<H>, _h: PhantomData<H>,
} }
@ -377,66 +371,23 @@ impl<S, H> WaitingResponse<S, H> {
match reply.into() { match reply.into() {
ReplyItem::Message(resp) => ReplyItem::Message(resp) =>
RunMiddlewares::init(info, resp), RunMiddlewares::init(info, resp),
ReplyItem::Actor(ctx) =>
PipelineState::Handler(
WaitingResponse { stream: PipelineResponse::Context(ctx),
_s: PhantomData, _h: PhantomData }),
ReplyItem::Future(fut) => ReplyItem::Future(fut) =>
PipelineState::Handler( PipelineState::Handler(
WaitingResponse { stream: PipelineResponse::Response(fut), WaitingResponse { fut: fut, _s: PhantomData, _h: PhantomData }),
_s: PhantomData, _h: PhantomData }),
} }
} }
fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S, H>, PipelineState<S, H>> fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S, H>, PipelineState<S, H>>
{ {
let stream = mem::replace(&mut self.stream, PipelineResponse::None); match self.fut.poll() {
Ok(Async::NotReady) =>
match stream { Err(PipelineState::Handler(self)),
PipelineResponse::Context(mut context) => {
loop {
match context.poll() {
Ok(Async::Ready(Some(frame))) => {
match frame {
Frame::Message(resp) => {
info.context = Some(context);
return Ok(RunMiddlewares::init(info, resp))
}
Frame::Payload(_) | Frame::Drain(_) => (),
}
},
Ok(Async::Ready(None)) => {
error!("Unexpected eof");
let err: Error = UnexpectedTaskFrame.into();
return Ok(ProcessResponse::init(err.into()))
},
Ok(Async::NotReady) => {
self.stream = PipelineResponse::Context(context);
return Err(PipelineState::Handler(self))
},
Err(err) =>
return Ok(ProcessResponse::init(err.into()))
}
}
},
PipelineResponse::Response(mut fut) => {
match fut.poll() {
Ok(Async::NotReady) => {
self.stream = PipelineResponse::Response(fut);
Err(PipelineState::Handler(self))
}
Ok(Async::Ready(response)) => Ok(Async::Ready(response)) =>
Ok(RunMiddlewares::init(info, response)), Ok(RunMiddlewares::init(info, response)),
Err(err) => Err(err) =>
Ok(ProcessResponse::init(err.into())), Ok(ProcessResponse::init(err.into())),
} }
} }
PipelineResponse::None => {
unreachable!("Broken internal state")
}
}
}
} }
/// Middlewares response executor /// Middlewares response executor
@ -554,7 +505,7 @@ impl RunningState {
enum IOState { enum IOState {
Response, Response,
Payload(BodyStream), Payload(BodyStream),
Context, Actor(Box<ActorHttpContext>),
Done, Done,
} }
@ -576,12 +527,10 @@ impl<S, H> ProcessResponse<S, H> {
{ {
if self.drain.is_none() && self.running != RunningState::Paused { if self.drain.is_none() && self.running != RunningState::Paused {
// if task is paused, write buffer is probably full // if task is paused, write buffer is probably full
loop { loop {
let result = match mem::replace(&mut self.iostate, IOState::Done) { let result = match mem::replace(&mut self.iostate, IOState::Done) {
IOState::Response => { IOState::Response => {
let result = match io.start(info.req_mut().get_inner(), let result = match io.start(info.req_mut().get_inner(), &mut self.resp) {
&mut self.resp) {
Ok(res) => res, Ok(res) => res,
Err(err) => { Err(err) => {
info.error = Some(err.into()); info.error = Some(err.into());
@ -590,10 +539,10 @@ impl<S, H> ProcessResponse<S, H> {
}; };
match self.resp.replace_body(Body::Empty) { match self.resp.replace_body(Body::Empty) {
Body::Streaming(stream) | Body::Upgrade(stream) => Body::Streaming(stream) =>
self.iostate = IOState::Payload(stream), self.iostate = IOState::Payload(stream),
Body::StreamingContext | Body::UpgradeContext => Body::Actor(ctx) =>
self.iostate = IOState::Context, self.iostate = IOState::Actor(ctx),
_ => (), _ => (),
} }
@ -642,17 +591,12 @@ impl<S, H> ProcessResponse<S, H> {
} }
} }
}, },
IOState::Context => { IOState::Actor(mut ctx) => {
match info.context.as_mut().unwrap().poll() { match ctx.poll() {
Ok(Async::Ready(Some(frame))) => { Ok(Async::Ready(Some(frame))) => {
match frame { match frame {
Frame::Message(msg) => {
error!("Unexpected message frame {:?}", msg);
info.error = Some(UnexpectedTaskFrame.into());
return Ok(
FinishingMiddlewares::init(info, self.resp))
},
Frame::Payload(None) => { Frame::Payload(None) => {
info.context = Some(ctx);
self.iostate = IOState::Done; self.iostate = IOState::Done;
if let Err(err) = io.write_eof() { if let Err(err) = io.write_eof() {
info.error = Some(err.into()); info.error = Some(err.into());
@ -662,7 +606,7 @@ impl<S, H> ProcessResponse<S, H> {
break break
}, },
Frame::Payload(Some(chunk)) => { Frame::Payload(Some(chunk)) => {
self.iostate = IOState::Context; self.iostate = IOState::Actor(ctx);
match io.write(chunk.as_ref()) { match io.write(chunk.as_ref()) {
Err(err) => { Err(err) => {
info.error = Some(err.into()); info.error = Some(err.into());
@ -680,11 +624,10 @@ impl<S, H> ProcessResponse<S, H> {
}, },
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
self.iostate = IOState::Done; self.iostate = IOState::Done;
info.context.take();
break break
} }
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
self.iostate = IOState::Context; self.iostate = IOState::Actor(ctx);
break break
} }
Err(err) => { Err(err) => {

View File

@ -190,7 +190,7 @@ impl Pattern {
} }
/// Extract pattern parameters from the text /// Extract pattern parameters from the text
pub(crate) fn update_match_info<S>(&self, text: &str, req: &mut HttpRequest<S>) { pub fn update_match_info<S>(&self, text: &str, req: &mut HttpRequest<S>) {
if !self.names.is_empty() { if !self.names.is_empty() {
if let Some(captures) = self.re.captures(text) { if let Some(captures) = self.re.captures(text) {
let mut idx = 0; let mut idx = 0;
@ -208,8 +208,7 @@ impl Pattern {
} }
/// Build pattern path. /// Build pattern path.
pub fn path<U, I>(&self, prefix: Option<&str>, elements: U) pub fn path<U, I>(&self, prefix: Option<&str>, elements: U) -> Result<String, UrlGenerationError>
-> Result<String, UrlGenerationError>
where U: IntoIterator<Item=I>, where U: IntoIterator<Item=I>,
I: AsRef<str>, I: AsRef<str>,
{ {

View File

@ -41,7 +41,7 @@ use httpresponse::HttpResponse;
/// # extern crate reqwest; /// # extern crate reqwest;
/// # /// #
/// # fn my_handler(req: HttpRequest) -> HttpResponse { /// # fn my_handler(req: HttpRequest) -> HttpResponse {
/// # httpcodes::HTTPOk.response() /// # httpcodes::HTTPOk.into()
/// # } /// # }
/// # /// #
/// # fn main() { /// # fn main() {
@ -228,9 +228,9 @@ impl<S: 'static> Iterator for TestApp<S> {
/// ///
/// fn index(req: HttpRequest) -> HttpResponse { /// fn index(req: HttpRequest) -> HttpResponse {
/// if let Some(hdr) = req.headers().get(header::CONTENT_TYPE) { /// if let Some(hdr) = req.headers().get(header::CONTENT_TYPE) {
/// httpcodes::HTTPOk.response() /// httpcodes::HTTPOk.into()
/// } else { /// } else {
/// httpcodes::HTTPBadRequest.response() /// httpcodes::HTTPBadRequest.into()
/// } /// }
/// } /// }
/// ///
@ -365,7 +365,6 @@ impl<S> TestRequest<S> {
Ok(resp) => { Ok(resp) => {
match resp.into().into() { match resp.into().into() {
ReplyItem::Message(resp) => Ok(resp), ReplyItem::Message(resp) => Ok(resp),
ReplyItem::Actor(_) => panic!("Actor handler is not supported."),
ReplyItem::Future(_) => panic!("Async handler is not supported."), ReplyItem::Future(_) => panic!("Async handler is not supported."),
} }
}, },

View File

@ -12,7 +12,7 @@
//! use actix_web::*; //! use actix_web::*;
//! //!
//! // do websocket handshake and start actor //! // do websocket handshake and start actor
//! fn ws_index(req: HttpRequest) -> Result<Reply> { //! fn ws_index(req: HttpRequest) -> Result<HttpResponse> {
//! ws::start(req, Ws) //! ws::start(req, Ws)
//! } //! }
//! //!
@ -52,13 +52,11 @@ use futures::{Async, Poll, Stream};
use actix::{Actor, AsyncContext, ResponseType, StreamHandler}; use actix::{Actor, AsyncContext, ResponseType, StreamHandler};
use body::Body;
use context::HttpContext;
use handler::Reply;
use payload::ReadAny; use payload::ReadAny;
use error::{Error, WsHandshakeError}; use error::{Error, WsHandshakeError};
use context::HttpContext;
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::{ConnectionType, HttpResponse}; use httpresponse::{ConnectionType, HttpResponse, HttpResponseBuilder};
use wsframe; use wsframe;
use wsproto::*; use wsproto::*;
@ -88,17 +86,17 @@ impl ResponseType for Message {
} }
/// Do websocket handshake and start actor /// Do websocket handshake and start actor
pub fn start<A, S>(mut req: HttpRequest<S>, actor: A) -> Result<Reply, Error> pub fn start<A, S>(mut req: HttpRequest<S>, actor: A) -> Result<HttpResponse, Error>
where A: Actor<Context=HttpContext<A, S>> + StreamHandler<Message>, where A: Actor<Context=HttpContext<A, S>> + StreamHandler<Message>,
S: 'static S: 'static
{ {
let resp = handshake(&req)?; let mut resp = handshake(&req)?;
let stream = WsStream::new(req.payload_mut().readany()); let stream = WsStream::new(req.payload_mut().readany());
let mut ctx = HttpContext::new(req, actor); let mut ctx = HttpContext::new(req, actor);
ctx.start(resp);
ctx.add_stream(stream); ctx.add_stream(stream);
Ok(ctx.into())
Ok(resp.body(ctx)?)
} }
/// Prepare `WebSocket` handshake response. /// Prepare `WebSocket` handshake response.
@ -109,7 +107,7 @@ pub fn start<A, S>(mut req: HttpRequest<S>, actor: A) -> Result<Reply, Error>
// /// `protocols` is a sequence of known protocols. On successful handshake, // /// `protocols` is a sequence of known protocols. On successful handshake,
// /// the returned response headers contain the first protocol in this list // /// the returned response headers contain the first protocol in this list
// /// which the server also knows. // /// which the server also knows.
pub fn handshake<S>(req: &HttpRequest<S>) -> Result<HttpResponse, WsHandshakeError> { pub fn handshake<S>(req: &HttpRequest<S>) -> Result<HttpResponseBuilder, WsHandshakeError> {
// WebSocket accepts only GET // WebSocket accepts only GET
if *req.method() != Method::GET { if *req.method() != Method::GET {
return Err(WsHandshakeError::GetMethodRequired) return Err(WsHandshakeError::GetMethodRequired)
@ -163,8 +161,7 @@ pub fn handshake<S>(req: &HttpRequest<S>) -> Result<HttpResponse, WsHandshakeErr
.header(header::UPGRADE, "websocket") .header(header::UPGRADE, "websocket")
.header(header::TRANSFER_ENCODING, "chunked") .header(header::TRANSFER_ENCODING, "chunked")
.header(SEC_WEBSOCKET_ACCEPT, key.as_str()) .header(SEC_WEBSOCKET_ACCEPT, key.as_str())
.body(Body::UpgradeContext).unwrap() .take())
)
} }
/// Maps `Payload` stream into stream of `ws::Message` items /// Maps `Payload` stream into stream of `ws::Message` items
@ -401,6 +398,7 @@ mod tests {
header::HeaderValue::from_static("13")); header::HeaderValue::from_static("13"));
let req = HttpRequest::new(Method::GET, Uri::from_str("/").unwrap(), let req = HttpRequest::new(Method::GET, Uri::from_str("/").unwrap(),
Version::HTTP_11, headers, None); Version::HTTP_11, headers, None);
assert_eq!(StatusCode::SWITCHING_PROTOCOLS, handshake(&req).unwrap().status()); assert_eq!(StatusCode::SWITCHING_PROTOCOLS,
handshake(&req).unwrap().finish().unwrap().status());
} }
} }