mirror of
https://github.com/fafhrd91/actix-web
synced 2024-11-24 08:22:59 +01:00
refactor http actor usage
This commit is contained in:
parent
967d3244d7
commit
cc38b30f7b
@ -13,8 +13,8 @@ use actix_web::*;
|
|||||||
|
|
||||||
|
|
||||||
/// do websocket handshake and start `MyWebSocket` actor
|
/// do websocket handshake and start `MyWebSocket` actor
|
||||||
fn ws_index(r: HttpRequest) -> Reply {
|
fn ws_index(r: HttpRequest) -> Result<HttpResponse, Error> {
|
||||||
ws::start(r, MyWebSocket).into()
|
ws::start(r, MyWebSocket)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// websocket connection is long running connection, it easier
|
/// websocket connection is long running connection, it easier
|
||||||
|
@ -72,7 +72,7 @@ Create `Logger` middleware with the specified `format`.
|
|||||||
Default `Logger` could be created with `default` method, it uses the default format:
|
Default `Logger` could be created with `default` method, it uses the default format:
|
||||||
|
|
||||||
```ignore
|
```ignore
|
||||||
%a %t "%r" %s %b "%{Referrer}i" "%{User-Agent}i" %T
|
%a %t "%r" %s %b "%{Referer}i" "%{User-Agent}i" %T
|
||||||
```
|
```
|
||||||
```rust
|
```rust
|
||||||
# extern crate actix_web;
|
# extern crate actix_web;
|
||||||
|
@ -110,7 +110,7 @@ fn index(req: HttpRequest<State>) -> Box<Future<Item=HttpResponse, Error=Error>>
|
|||||||
.and_then(|res| {
|
.and_then(|res| {
|
||||||
match res {
|
match res {
|
||||||
Ok(user) => Ok(httpcodes::HTTPOk.build().json(user)?),
|
Ok(user) => Ok(httpcodes::HTTPOk.build().json(user)?),
|
||||||
Err(_) => Ok(httpcodes::HTTPInternalServerError.response())
|
Err(_) => Ok(httpcodes::HTTPInternalServerError.into())
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.responder()
|
.responder()
|
||||||
|
@ -65,7 +65,7 @@ impl<S> Handler<S> for MyHandler {
|
|||||||
/// Handle request
|
/// Handle request
|
||||||
fn handle(&mut self, req: HttpRequest<S>) -> Self::Result {
|
fn handle(&mut self, req: HttpRequest<S>) -> Self::Result {
|
||||||
self.0 += 1;
|
self.0 += 1;
|
||||||
httpcodes::HTTPOk.response()
|
httpcodes::HTTPOk.into()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
# fn main() {}
|
# fn main() {}
|
||||||
@ -91,7 +91,7 @@ impl<S> Handler<S> for MyHandler {
|
|||||||
fn handle(&mut self, req: HttpRequest<S>) -> Self::Result {
|
fn handle(&mut self, req: HttpRequest<S>) -> Self::Result {
|
||||||
let num = self.0.load(Ordering::Relaxed) + 1;
|
let num = self.0.load(Ordering::Relaxed) + 1;
|
||||||
self.0.store(num, Ordering::Relaxed);
|
self.0.store(num, Ordering::Relaxed);
|
||||||
httpcodes::HTTPOk.response()
|
httpcodes::HTTPOk.into()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -246,7 +246,7 @@ fn index(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> {
|
|||||||
.from_err()
|
.from_err()
|
||||||
.and_then(|params| { // <- url encoded parameters
|
.and_then(|params| { // <- url encoded parameters
|
||||||
println!("==== BODY ==== {:?}", params);
|
println!("==== BODY ==== {:?}", params);
|
||||||
ok(httpcodes::HTTPOk.response())
|
ok(httpcodes::HTTPOk.into())
|
||||||
})
|
})
|
||||||
.responder()
|
.responder()
|
||||||
}
|
}
|
||||||
|
@ -20,10 +20,10 @@ use actix_web::test::TestRequest;
|
|||||||
fn index(req: HttpRequest) -> HttpResponse {
|
fn index(req: HttpRequest) -> HttpResponse {
|
||||||
if let Some(hdr) = req.headers().get(header::CONTENT_TYPE) {
|
if let Some(hdr) = req.headers().get(header::CONTENT_TYPE) {
|
||||||
if let Ok(s) = hdr.to_str() {
|
if let Ok(s) = hdr.to_str() {
|
||||||
return httpcodes::HTTPOk.response()
|
return httpcodes::HTTPOk.into()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
httpcodes::HTTPBadRequest.response()
|
httpcodes::HTTPBadRequest.into()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
@ -60,7 +60,7 @@ use actix_web::*;
|
|||||||
use actix_web::test::TestServer;
|
use actix_web::test::TestServer;
|
||||||
|
|
||||||
fn index(req: HttpRequest) -> HttpResponse {
|
fn index(req: HttpRequest) -> HttpResponse {
|
||||||
httpcodes::HTTPOk.response()
|
httpcodes::HTTPOk.into()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
@ -80,7 +80,7 @@ use actix_web::*;
|
|||||||
use actix_web::test::TestServer;
|
use actix_web::test::TestServer;
|
||||||
|
|
||||||
fn index(req: HttpRequest) -> HttpResponse {
|
fn index(req: HttpRequest) -> HttpResponse {
|
||||||
httpcodes::HTTPOk.response()
|
httpcodes::HTTPOk.into()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This function get called by http server.
|
/// This function get called by http server.
|
||||||
|
@ -128,7 +128,7 @@ impl<S> Application<S> where S: 'static {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set application prefix.
|
/// Set application prefix
|
||||||
///
|
///
|
||||||
/// Only requests that matches application's prefix get processed by this application.
|
/// Only requests that matches application's prefix get processed by this application.
|
||||||
/// Application prefix always contains laading "/" slash. If supplied prefix
|
/// Application prefix always contains laading "/" slash. If supplied prefix
|
||||||
|
32
src/body.rs
32
src/body.rs
@ -5,6 +5,7 @@ use bytes::{Bytes, BytesMut};
|
|||||||
use futures::Stream;
|
use futures::Stream;
|
||||||
|
|
||||||
use error::Error;
|
use error::Error;
|
||||||
|
use context::ActorHttpContext;
|
||||||
|
|
||||||
/// Type represent streaming body
|
/// Type represent streaming body
|
||||||
pub type BodyStream = Box<Stream<Item=Bytes, Error=Error>>;
|
pub type BodyStream = Box<Stream<Item=Bytes, Error=Error>>;
|
||||||
@ -18,12 +19,8 @@ pub enum Body {
|
|||||||
/// Unspecified streaming response. Developer is responsible for setting
|
/// Unspecified streaming response. Developer is responsible for setting
|
||||||
/// right `Content-Length` or `Transfer-Encoding` headers.
|
/// right `Content-Length` or `Transfer-Encoding` headers.
|
||||||
Streaming(BodyStream),
|
Streaming(BodyStream),
|
||||||
/// Upgrade connection.
|
/// Special body type for actor response.
|
||||||
Upgrade(BodyStream),
|
Actor(Box<ActorHttpContext>),
|
||||||
/// Special body type for actor streaming response.
|
|
||||||
StreamingContext,
|
|
||||||
/// Special body type for actor upgrade response.
|
|
||||||
UpgradeContext,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Represents various types of binary body.
|
/// Represents various types of binary body.
|
||||||
@ -51,8 +48,7 @@ impl Body {
|
|||||||
#[inline]
|
#[inline]
|
||||||
pub fn is_streaming(&self) -> bool {
|
pub fn is_streaming(&self) -> bool {
|
||||||
match *self {
|
match *self {
|
||||||
Body::Streaming(_) | Body::StreamingContext
|
Body::Streaming(_) | Body::Actor(_) => true,
|
||||||
| Body::Upgrade(_) | Body::UpgradeContext => true,
|
|
||||||
_ => false
|
_ => false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -83,15 +79,7 @@ impl PartialEq for Body {
|
|||||||
Body::Binary(ref b2) => b == b2,
|
Body::Binary(ref b2) => b == b2,
|
||||||
_ => false,
|
_ => false,
|
||||||
},
|
},
|
||||||
Body::StreamingContext => match *other {
|
Body::Streaming(_) | Body::Actor(_) => false,
|
||||||
Body::StreamingContext => true,
|
|
||||||
_ => false,
|
|
||||||
},
|
|
||||||
Body::UpgradeContext => match *other {
|
|
||||||
Body::UpgradeContext => true,
|
|
||||||
_ => false,
|
|
||||||
},
|
|
||||||
Body::Streaming(_) | Body::Upgrade(_) => false,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -102,9 +90,7 @@ impl fmt::Debug for Body {
|
|||||||
Body::Empty => write!(f, "Body::Empty"),
|
Body::Empty => write!(f, "Body::Empty"),
|
||||||
Body::Binary(ref b) => write!(f, "Body::Binary({:?})", b),
|
Body::Binary(ref b) => write!(f, "Body::Binary({:?})", b),
|
||||||
Body::Streaming(_) => write!(f, "Body::Streaming(_)"),
|
Body::Streaming(_) => write!(f, "Body::Streaming(_)"),
|
||||||
Body::Upgrade(_) => write!(f, "Body::Upgrade(_)"),
|
Body::Actor(_) => write!(f, "Body::Actor(_)"),
|
||||||
Body::StreamingContext => write!(f, "Body::StreamingContext"),
|
|
||||||
Body::UpgradeContext => write!(f, "Body::UpgradeContext"),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -115,6 +101,12 @@ impl<T> From<T> for Body where T: Into<Binary>{
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<Box<ActorHttpContext>> for Body {
|
||||||
|
fn from(ctx: Box<ActorHttpContext>) -> Body {
|
||||||
|
Body::Actor(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Binary {
|
impl Binary {
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn is_empty(&self) -> bool {
|
pub fn is_empty(&self) -> bool {
|
||||||
|
103
src/context.rs
103
src/context.rs
@ -1,6 +1,7 @@
|
|||||||
use std;
|
use std;
|
||||||
|
use std::marker::PhantomData;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use futures::{Async, Poll};
|
use futures::{Async, Future, Poll};
|
||||||
use futures::sync::oneshot::Sender;
|
use futures::sync::oneshot::Sender;
|
||||||
use futures::unsync::oneshot;
|
use futures::unsync::oneshot;
|
||||||
|
|
||||||
@ -11,19 +12,18 @@ use actix::dev::{AsyncContextApi, ActorAddressCell, ActorItemsCell, ActorWaitCel
|
|||||||
Envelope, ToEnvelope, RemoteEnvelope};
|
Envelope, ToEnvelope, RemoteEnvelope};
|
||||||
|
|
||||||
use body::{Body, Binary};
|
use body::{Body, Binary};
|
||||||
use error::Error;
|
use error::{Error, Result};
|
||||||
use httprequest::HttpRequest;
|
use httprequest::HttpRequest;
|
||||||
use httpresponse::HttpResponse;
|
use httpresponse::HttpResponse;
|
||||||
|
|
||||||
|
|
||||||
pub(crate) trait IoContext: 'static {
|
pub trait ActorHttpContext: 'static {
|
||||||
fn disconnected(&mut self);
|
fn disconnected(&mut self);
|
||||||
fn poll(&mut self) -> Poll<Option<Frame>, Error>;
|
fn poll(&mut self) -> Poll<Option<Frame>, Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(crate) enum Frame {
|
pub enum Frame {
|
||||||
Message(HttpResponse),
|
|
||||||
Payload(Option<Binary>),
|
Payload(Option<Binary>),
|
||||||
Drain(oneshot::Sender<()>),
|
Drain(oneshot::Sender<()>),
|
||||||
}
|
}
|
||||||
@ -31,7 +31,7 @@ pub(crate) enum Frame {
|
|||||||
/// Http actor execution context
|
/// Http actor execution context
|
||||||
pub struct HttpContext<A, S=()> where A: Actor<Context=HttpContext<A, S>>,
|
pub struct HttpContext<A, S=()> where A: Actor<Context=HttpContext<A, S>>,
|
||||||
{
|
{
|
||||||
act: A,
|
act: Option<A>,
|
||||||
state: ActorState,
|
state: ActorState,
|
||||||
modified: bool,
|
modified: bool,
|
||||||
items: ActorItemsCell<A>,
|
items: ActorItemsCell<A>,
|
||||||
@ -39,7 +39,6 @@ pub struct HttpContext<A, S=()> where A: Actor<Context=HttpContext<A, S>>,
|
|||||||
stream: VecDeque<Frame>,
|
stream: VecDeque<Frame>,
|
||||||
wait: ActorWaitCell<A>,
|
wait: ActorWaitCell<A>,
|
||||||
request: HttpRequest<S>,
|
request: HttpRequest<S>,
|
||||||
streaming: bool,
|
|
||||||
disconnected: bool,
|
disconnected: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -101,12 +100,15 @@ impl<A, S> AsyncContextApi<A> for HttpContext<A, S> where A: Actor<Context=Self>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
|
impl<A, S: 'static> HttpContext<A, S> where A: Actor<Context=Self> {
|
||||||
|
|
||||||
pub fn new(req: HttpRequest<S>, actor: A) -> HttpContext<A, S>
|
pub fn new(req: HttpRequest<S>, actor: A) -> HttpContext<A, S> {
|
||||||
{
|
HttpContext::from_request(req).actor(actor)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn from_request(req: HttpRequest<S>) -> HttpContext<A, S> {
|
||||||
HttpContext {
|
HttpContext {
|
||||||
act: actor,
|
act: None,
|
||||||
state: ActorState::Started,
|
state: ActorState::Started,
|
||||||
modified: false,
|
modified: false,
|
||||||
items: ActorItemsCell::default(),
|
items: ActorItemsCell::default(),
|
||||||
@ -114,10 +116,24 @@ impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
|
|||||||
wait: ActorWaitCell::default(),
|
wait: ActorWaitCell::default(),
|
||||||
stream: VecDeque::new(),
|
stream: VecDeque::new(),
|
||||||
request: req,
|
request: req,
|
||||||
streaming: false,
|
|
||||||
disconnected: false,
|
disconnected: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn actor(mut self, actor: A) -> HttpContext<A, S> {
|
||||||
|
self.act = Some(actor);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn with_actor(mut self, actor: A, mut resp: HttpResponse) -> Result<HttpResponse> {
|
||||||
|
if self.act.is_some() {
|
||||||
|
panic!("Actor is set already");
|
||||||
|
}
|
||||||
|
self.act = Some(actor);
|
||||||
|
|
||||||
|
resp.replace_body(Body::Actor(Box::new(self)));
|
||||||
|
Ok(resp)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
|
impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
|
||||||
@ -132,24 +148,12 @@ impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
|
|||||||
&mut self.request
|
&mut self.request
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send response to peer
|
|
||||||
pub fn start<R: Into<HttpResponse>>(&mut self, response: R) {
|
|
||||||
let resp = response.into();
|
|
||||||
match *resp.body() {
|
|
||||||
Body::StreamingContext | Body::UpgradeContext => self.streaming = true,
|
|
||||||
_ => (),
|
|
||||||
}
|
|
||||||
self.stream.push_back(Frame::Message(resp))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Write payload
|
/// Write payload
|
||||||
pub fn write<B: Into<Binary>>(&mut self, data: B) {
|
pub fn write<B: Into<Binary>>(&mut self, data: B) {
|
||||||
if self.streaming {
|
|
||||||
if !self.disconnected {
|
if !self.disconnected {
|
||||||
self.stream.push_back(Frame::Payload(Some(data.into())))
|
self.stream.push_back(Frame::Payload(Some(data.into())));
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
warn!("Trying to write response body for non-streaming response");
|
warn!("Trying to write to disconnected response");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -159,11 +163,11 @@ impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Returns drain future
|
/// Returns drain future
|
||||||
pub fn drain(&mut self) -> oneshot::Receiver<()> {
|
pub fn drain(&mut self) -> Drain<A> {
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
self.modified = true;
|
self.modified = true;
|
||||||
self.stream.push_back(Frame::Drain(tx));
|
self.stream.push_back(Frame::Drain(tx));
|
||||||
rx
|
Drain::new(rx)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if connection still open
|
/// Check if connection still open
|
||||||
@ -193,7 +197,7 @@ impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<A, S> IoContext for HttpContext<A, S> where A: Actor<Context=Self>, S: 'static {
|
impl<A, S> ActorHttpContext for HttpContext<A, S> where A: Actor<Context=Self>, S: 'static {
|
||||||
|
|
||||||
fn disconnected(&mut self) {
|
fn disconnected(&mut self) {
|
||||||
self.items.stop();
|
self.items.stop();
|
||||||
@ -204,8 +208,11 @@ impl<A, S> IoContext for HttpContext<A, S> where A: Actor<Context=Self>, S: 'sta
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Option<Frame>, Error> {
|
fn poll(&mut self) -> Poll<Option<Frame>, Error> {
|
||||||
|
if self.act.is_none() {
|
||||||
|
return Ok(Async::Ready(None))
|
||||||
|
}
|
||||||
let act: &mut A = unsafe {
|
let act: &mut A = unsafe {
|
||||||
std::mem::transmute(&mut self.act as &mut A)
|
std::mem::transmute(self.act.as_mut().unwrap() as &mut A)
|
||||||
};
|
};
|
||||||
let ctx: &mut HttpContext<A, S> = unsafe {
|
let ctx: &mut HttpContext<A, S> = unsafe {
|
||||||
std::mem::transmute(self as &mut HttpContext<A, S>)
|
std::mem::transmute(self as &mut HttpContext<A, S>)
|
||||||
@ -303,3 +310,39 @@ impl<A, S> ToEnvelope<A> for HttpContext<A, S>
|
|||||||
RemoteEnvelope::new(msg, tx).into()
|
RemoteEnvelope::new(msg, tx).into()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<A, S> From<HttpContext<A, S>> for Body
|
||||||
|
where A: Actor<Context=HttpContext<A, S>>,
|
||||||
|
S: 'static
|
||||||
|
{
|
||||||
|
fn from(ctx: HttpContext<A, S>) -> Body {
|
||||||
|
Body::Actor(Box::new(ctx))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Drain<A> {
|
||||||
|
fut: oneshot::Receiver<()>,
|
||||||
|
_a: PhantomData<A>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<A> Drain<A> {
|
||||||
|
fn new(fut: oneshot::Receiver<()>) -> Self {
|
||||||
|
Drain {
|
||||||
|
fut: fut,
|
||||||
|
_a: PhantomData
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<A: Actor> ActorFuture for Drain<A> {
|
||||||
|
type Item = ();
|
||||||
|
type Error = ();
|
||||||
|
type Actor = A;
|
||||||
|
|
||||||
|
fn poll(&mut self,
|
||||||
|
_: &mut A,
|
||||||
|
_: &mut <Self::Actor as Actor>::Context) -> Poll<Self::Item, Self::Error>
|
||||||
|
{
|
||||||
|
self.fut.poll().map_err(|_| ())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -416,8 +416,20 @@ impl PayloadEncoder {
|
|||||||
resp.headers_mut().remove(CONTENT_LENGTH);
|
resp.headers_mut().remove(CONTENT_LENGTH);
|
||||||
TransferEncoding::eof(buf)
|
TransferEncoding::eof(buf)
|
||||||
}
|
}
|
||||||
Body::Streaming(_) | Body::StreamingContext => {
|
Body::Streaming(_) | Body::Actor(_) => {
|
||||||
if resp.chunked() {
|
if resp.upgrade() {
|
||||||
|
if version == Version::HTTP_2 {
|
||||||
|
error!("Connection upgrade is forbidden for HTTP/2");
|
||||||
|
} else {
|
||||||
|
resp.headers_mut().insert(
|
||||||
|
CONNECTION, HeaderValue::from_static("upgrade"));
|
||||||
|
}
|
||||||
|
if encoding != ContentEncoding::Identity {
|
||||||
|
encoding = ContentEncoding::Identity;
|
||||||
|
resp.headers_mut().remove(CONTENT_ENCODING);
|
||||||
|
}
|
||||||
|
TransferEncoding::eof(buf)
|
||||||
|
} else if resp.chunked() {
|
||||||
resp.headers_mut().remove(CONTENT_LENGTH);
|
resp.headers_mut().remove(CONTENT_LENGTH);
|
||||||
if version != Version::HTTP_11 {
|
if version != Version::HTTP_11 {
|
||||||
error!("Chunked transfer encoding is forbidden for {:?}", version);
|
error!("Chunked transfer encoding is forbidden for {:?}", version);
|
||||||
@ -446,19 +458,6 @@ impl PayloadEncoder {
|
|||||||
TransferEncoding::eof(buf)
|
TransferEncoding::eof(buf)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Body::Upgrade(_) | Body::UpgradeContext => {
|
|
||||||
if version == Version::HTTP_2 {
|
|
||||||
error!("Connection upgrade is forbidden for HTTP/2");
|
|
||||||
} else {
|
|
||||||
resp.headers_mut().insert(
|
|
||||||
CONNECTION, HeaderValue::from_static("upgrade"));
|
|
||||||
}
|
|
||||||
if encoding != ContentEncoding::Identity {
|
|
||||||
encoding = ContentEncoding::Identity;
|
|
||||||
resp.headers_mut().remove(CONTENT_ENCODING);
|
|
||||||
}
|
|
||||||
TransferEncoding::eof(buf)
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
resp.replace_body(body);
|
resp.replace_body(body);
|
||||||
|
|
||||||
|
@ -114,14 +114,6 @@ impl ResponseError for header::InvalidHeaderValue {}
|
|||||||
/// `InternalServerError` for `futures::Canceled`
|
/// `InternalServerError` for `futures::Canceled`
|
||||||
impl ResponseError for Canceled {}
|
impl ResponseError for Canceled {}
|
||||||
|
|
||||||
/// Internal error
|
|
||||||
#[doc(hidden)]
|
|
||||||
#[derive(Fail, Debug)]
|
|
||||||
#[fail(display="Unexpected task frame")]
|
|
||||||
pub struct UnexpectedTaskFrame;
|
|
||||||
|
|
||||||
impl ResponseError for UnexpectedTaskFrame {}
|
|
||||||
|
|
||||||
/// A set of errors that can occur during parsing HTTP streams
|
/// A set of errors that can occur during parsing HTTP streams
|
||||||
#[derive(Fail, Debug)]
|
#[derive(Fail, Debug)]
|
||||||
pub enum ParseError {
|
pub enum ParseError {
|
||||||
|
@ -1,13 +1,11 @@
|
|||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
|
|
||||||
use actix::Actor;
|
|
||||||
use futures::future::{Future, ok, err};
|
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
|
use futures::future::{Future, ok, err};
|
||||||
use http::{header, StatusCode, Error as HttpError};
|
use http::{header, StatusCode, Error as HttpError};
|
||||||
|
|
||||||
use body::Body;
|
use body::Body;
|
||||||
use error::Error;
|
use error::Error;
|
||||||
use context::{HttpContext, IoContext};
|
|
||||||
use httprequest::HttpRequest;
|
use httprequest::HttpRequest;
|
||||||
use httpresponse::HttpResponse;
|
use httpresponse::HttpResponse;
|
||||||
|
|
||||||
@ -69,20 +67,11 @@ pub struct Reply(ReplyItem);
|
|||||||
|
|
||||||
pub(crate) enum ReplyItem {
|
pub(crate) enum ReplyItem {
|
||||||
Message(HttpResponse),
|
Message(HttpResponse),
|
||||||
Actor(Box<IoContext>),
|
|
||||||
Future(Box<Future<Item=HttpResponse, Error=Error>>),
|
Future(Box<Future<Item=HttpResponse, Error=Error>>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Reply {
|
impl Reply {
|
||||||
|
|
||||||
/// Create actor response
|
|
||||||
#[inline]
|
|
||||||
pub fn actor<A, S>(ctx: HttpContext<A, S>) -> Reply
|
|
||||||
where A: Actor<Context=HttpContext<A, S>>, S: 'static
|
|
||||||
{
|
|
||||||
Reply(ReplyItem::Actor(Box::new(ctx)))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create async response
|
/// Create async response
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn async<F>(fut: F) -> Reply
|
pub fn async<F>(fut: F) -> Reply
|
||||||
@ -163,25 +152,6 @@ impl<E: Into<Error>> From<Result<Reply, E>> for Reply {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<A: Actor<Context=HttpContext<A, S>>, S: 'static> Responder for HttpContext<A, S>
|
|
||||||
{
|
|
||||||
type Item = Reply;
|
|
||||||
type Error = Error;
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn respond_to(self, _: HttpRequest) -> Result<Reply, Error> {
|
|
||||||
Ok(Reply(ReplyItem::Actor(Box::new(self))))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<A: Actor<Context=HttpContext<A, S>>, S: 'static> From<HttpContext<A, S>> for Reply {
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn from(ctx: HttpContext<A, S>) -> Reply {
|
|
||||||
Reply(ReplyItem::Actor(Box::new(ctx)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<I, E> Responder for Box<Future<Item=I, Error=E>>
|
impl<I, E> Responder for Box<Future<Item=I, Error=E>>
|
||||||
where I: Responder + 'static,
|
where I: Responder + 'static,
|
||||||
E: Into<Error> + 'static
|
E: Into<Error> + 'static
|
||||||
|
@ -54,9 +54,6 @@ impl StaticResponse {
|
|||||||
pub fn build(&self) -> HttpResponseBuilder {
|
pub fn build(&self) -> HttpResponseBuilder {
|
||||||
HttpResponse::build(self.0)
|
HttpResponse::build(self.0)
|
||||||
}
|
}
|
||||||
pub fn response(&self) -> HttpResponse {
|
|
||||||
HttpResponse::new(self.0, Body::Empty)
|
|
||||||
}
|
|
||||||
pub fn with_reason(self, reason: &'static str) -> HttpResponse {
|
pub fn with_reason(self, reason: &'static str) -> HttpResponse {
|
||||||
let mut resp = HttpResponse::new(self.0, Body::Empty);
|
let mut resp = HttpResponse::new(self.0, Body::Empty);
|
||||||
resp.set_reason(reason);
|
resp.set_reason(reason);
|
||||||
@ -92,7 +89,7 @@ impl Responder for StaticResponse {
|
|||||||
|
|
||||||
impl From<StaticResponse> for HttpResponse {
|
impl From<StaticResponse> for HttpResponse {
|
||||||
fn from(st: StaticResponse) -> Self {
|
fn from(st: StaticResponse) -> Self {
|
||||||
st.response()
|
HttpResponse::new(st.0, Body::Empty)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -153,7 +150,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_response() {
|
fn test_response() {
|
||||||
let resp = HTTPOk.response();
|
let resp: HttpResponse = HTTPOk.into();
|
||||||
assert_eq!(resp.status(), StatusCode::OK);
|
assert_eq!(resp.status(), StatusCode::OK);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -165,7 +162,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_with_reason() {
|
fn test_with_reason() {
|
||||||
let resp = HTTPOk.response();
|
let resp: HttpResponse = HTTPOk.into();
|
||||||
assert_eq!(resp.reason(), "OK");
|
assert_eq!(resp.reason(), "OK");
|
||||||
|
|
||||||
let resp = HTTPBadRequest.with_reason("test");
|
let resp = HTTPBadRequest.with_reason("test");
|
||||||
|
@ -135,6 +135,12 @@ impl HttpRequest<()> {
|
|||||||
|
|
||||||
impl<S> HttpRequest<S> {
|
impl<S> HttpRequest<S> {
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
/// Construct new http request with state.
|
||||||
|
pub fn change_state<NS>(self, state: Rc<NS>) -> HttpRequest<NS> {
|
||||||
|
HttpRequest(self.0, Some(state), self.2.clone())
|
||||||
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
/// Construct new http request without state.
|
/// Construct new http request without state.
|
||||||
pub(crate) fn clone_without_state(&self) -> HttpRequest {
|
pub(crate) fn clone_without_state(&self) -> HttpRequest {
|
||||||
@ -447,7 +453,7 @@ impl<S> HttpRequest<S> {
|
|||||||
/// }
|
/// }
|
||||||
/// })
|
/// })
|
||||||
/// .finish() // <- Stream::finish() combinator from actix
|
/// .finish() // <- Stream::finish() combinator from actix
|
||||||
/// .map(|_| httpcodes::HTTPOk.response())
|
/// .map(|_| httpcodes::HTTPOk.into())
|
||||||
/// .responder()
|
/// .responder()
|
||||||
/// }
|
/// }
|
||||||
/// # fn main() {}
|
/// # fn main() {}
|
||||||
@ -477,7 +483,7 @@ impl<S> HttpRequest<S> {
|
|||||||
/// .from_err()
|
/// .from_err()
|
||||||
/// .and_then(|params| { // <- url encoded parameters
|
/// .and_then(|params| { // <- url encoded parameters
|
||||||
/// println!("==== BODY ==== {:?}", params);
|
/// println!("==== BODY ==== {:?}", params);
|
||||||
/// ok(httpcodes::HTTPOk.response())
|
/// ok(httpcodes::HTTPOk.into())
|
||||||
/// })
|
/// })
|
||||||
/// .responder()
|
/// .responder()
|
||||||
/// }
|
/// }
|
||||||
@ -512,7 +518,7 @@ impl<S> HttpRequest<S> {
|
|||||||
/// .from_err()
|
/// .from_err()
|
||||||
/// .and_then(|val: MyObj| { // <- deserialized value
|
/// .and_then(|val: MyObj| { // <- deserialized value
|
||||||
/// println!("==== BODY ==== {:?}", val);
|
/// println!("==== BODY ==== {:?}", val);
|
||||||
/// Ok(httpcodes::HTTPOk.response())
|
/// Ok(httpcodes::HTTPOk.into())
|
||||||
/// }).responder()
|
/// }).responder()
|
||||||
/// }
|
/// }
|
||||||
/// # fn main() {}
|
/// # fn main() {}
|
||||||
|
@ -138,6 +138,7 @@ impl HttpResponse {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Connection upgrade status
|
/// Connection upgrade status
|
||||||
|
#[inline]
|
||||||
pub fn upgrade(&self) -> bool {
|
pub fn upgrade(&self) -> bool {
|
||||||
self.get_ref().connection_type == Some(ConnectionType::Upgrade)
|
self.get_ref().connection_type == Some(ConnectionType::Upgrade)
|
||||||
}
|
}
|
||||||
@ -155,11 +156,13 @@ impl HttpResponse {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// is chunked encoding enabled
|
/// is chunked encoding enabled
|
||||||
|
#[inline]
|
||||||
pub fn chunked(&self) -> bool {
|
pub fn chunked(&self) -> bool {
|
||||||
self.get_ref().chunked
|
self.get_ref().chunked
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Content encoding
|
/// Content encoding
|
||||||
|
#[inline]
|
||||||
pub fn content_encoding(&self) -> &ContentEncoding {
|
pub fn content_encoding(&self) -> &ContentEncoding {
|
||||||
&self.get_ref().encoding
|
&self.get_ref().encoding
|
||||||
}
|
}
|
||||||
@ -171,6 +174,7 @@ impl HttpResponse {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Get body os this response
|
/// Get body os this response
|
||||||
|
#[inline]
|
||||||
pub fn body(&self) -> &Body {
|
pub fn body(&self) -> &Body {
|
||||||
&self.get_ref().body
|
&self.get_ref().body
|
||||||
}
|
}
|
||||||
@ -443,6 +447,15 @@ impl HttpResponseBuilder {
|
|||||||
pub fn finish(&mut self) -> Result<HttpResponse, HttpError> {
|
pub fn finish(&mut self) -> Result<HttpResponse, HttpError> {
|
||||||
self.body(Body::Empty)
|
self.body(Body::Empty)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// This method construct new `HttpResponseBuilder`
|
||||||
|
pub fn take(&mut self) -> HttpResponseBuilder {
|
||||||
|
HttpResponseBuilder {
|
||||||
|
response: self.response.take(),
|
||||||
|
err: self.err.take(),
|
||||||
|
cookies: self.cookies.take(),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
|
@ -71,7 +71,7 @@ impl<T: Serialize> Responder for Json<T> {
|
|||||||
/// .from_err()
|
/// .from_err()
|
||||||
/// .and_then(|val: MyObj| { // <- deserialized value
|
/// .and_then(|val: MyObj| { // <- deserialized value
|
||||||
/// println!("==== BODY ==== {:?}", val);
|
/// println!("==== BODY ==== {:?}", val);
|
||||||
/// Ok(httpcodes::HTTPOk.response())
|
/// Ok(httpcodes::HTTPOk.into())
|
||||||
/// }).responder()
|
/// }).responder()
|
||||||
/// }
|
/// }
|
||||||
/// # fn main() {}
|
/// # fn main() {}
|
||||||
|
@ -19,7 +19,7 @@ use middleware::{Middleware, Started, Finished};
|
|||||||
/// Default `Logger` could be created with `default` method, it uses the default format:
|
/// Default `Logger` could be created with `default` method, it uses the default format:
|
||||||
///
|
///
|
||||||
/// ```ignore
|
/// ```ignore
|
||||||
/// %a %t "%r" %s %b "%{Referrer}i" "%{User-Agent}i" %T
|
/// %a %t "%r" %s %b "%{Referer}i" "%{User-Agent}i" %T
|
||||||
/// ```
|
/// ```
|
||||||
/// ```rust
|
/// ```rust
|
||||||
/// # extern crate actix_web;
|
/// # extern crate actix_web;
|
||||||
@ -75,7 +75,7 @@ impl Default for Logger {
|
|||||||
/// Create `Logger` middleware with format:
|
/// Create `Logger` middleware with format:
|
||||||
///
|
///
|
||||||
/// ```ignore
|
/// ```ignore
|
||||||
/// %a %t "%r" %s %b "%{Referrer}i" "%{User-Agent}i" %T
|
/// %a %t "%r" %s %b "%{Referer}i" "%{User-Agent}i" %T
|
||||||
/// ```
|
/// ```
|
||||||
fn default() -> Logger {
|
fn default() -> Logger {
|
||||||
Logger { format: Format::default() }
|
Logger { format: Format::default() }
|
||||||
@ -121,7 +121,7 @@ struct Format(Vec<FormatText>);
|
|||||||
impl Default for Format {
|
impl Default for Format {
|
||||||
/// Return the default formatting style for the `Logger`:
|
/// Return the default formatting style for the `Logger`:
|
||||||
fn default() -> Format {
|
fn default() -> Format {
|
||||||
Format::new(r#"%a %t "%r" %s %b "%{Referrer}i" "%{User-Agent}i" %T"#)
|
Format::new(r#"%a %t "%r" %s %b "%{Referer}i" "%{User-Agent}i" %T"#)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -9,20 +9,14 @@ use futures::{Future, Async, Poll, Stream};
|
|||||||
use futures::task::{Task, current as current_task};
|
use futures::task::{Task, current as current_task};
|
||||||
|
|
||||||
use body::BodyStream;
|
use body::BodyStream;
|
||||||
use actix::ResponseType;
|
|
||||||
use error::PayloadError;
|
use error::PayloadError;
|
||||||
|
|
||||||
pub(crate) const DEFAULT_BUFFER_SIZE: usize = 65_536; // max buffer size 64k
|
pub(crate) const DEFAULT_BUFFER_SIZE: usize = 65_536; // max buffer size 64k
|
||||||
|
|
||||||
/// Just Bytes object
|
/// Just Bytes object
|
||||||
#[derive(PartialEq)]
|
#[derive(PartialEq, Message)]
|
||||||
pub struct PayloadItem(pub Bytes);
|
pub struct PayloadItem(pub Bytes);
|
||||||
|
|
||||||
impl ResponseType for PayloadItem {
|
|
||||||
type Item = ();
|
|
||||||
type Error = ();
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Deref for PayloadItem {
|
impl Deref for PayloadItem {
|
||||||
type Target = Bytes;
|
type Target = Bytes;
|
||||||
|
|
||||||
@ -91,27 +85,27 @@ impl Payload {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Get first available chunk of data.
|
/// Get first available chunk of data.
|
||||||
pub fn readany(&mut self) -> ReadAny {
|
pub fn readany(&self) -> ReadAny {
|
||||||
ReadAny(Rc::clone(&self.inner))
|
ReadAny(Rc::clone(&self.inner))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get exact number of bytes
|
/// Get exact number of bytes
|
||||||
pub fn readexactly(&mut self, size: usize) -> ReadExactly {
|
pub fn readexactly(&self, size: usize) -> ReadExactly {
|
||||||
ReadExactly(Rc::clone(&self.inner), size)
|
ReadExactly(Rc::clone(&self.inner), size)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Read until `\n`
|
/// Read until `\n`
|
||||||
pub fn readline(&mut self) -> ReadLine {
|
pub fn readline(&self) -> ReadLine {
|
||||||
ReadLine(Rc::clone(&self.inner))
|
ReadLine(Rc::clone(&self.inner))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Read until match line
|
/// Read until match line
|
||||||
pub fn readuntil(&mut self, line: &[u8]) -> ReadUntil {
|
pub fn readuntil(&self, line: &[u8]) -> ReadUntil {
|
||||||
ReadUntil(Rc::clone(&self.inner), line.to_vec())
|
ReadUntil(Rc::clone(&self.inner), line.to_vec())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
pub fn readall(&mut self) -> Option<Bytes> {
|
pub fn readall(&self) -> Option<Bytes> {
|
||||||
self.inner.borrow_mut().readall()
|
self.inner.borrow_mut().readall()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -132,20 +126,16 @@ impl Payload {
|
|||||||
|
|
||||||
/// Convert payload into compatible `HttpResponse` body stream
|
/// Convert payload into compatible `HttpResponse` body stream
|
||||||
pub fn stream(self) -> BodyStream {
|
pub fn stream(self) -> BodyStream {
|
||||||
Box::new(self.map_err(|e| e.into()))
|
Box::new(self.map(|i| i.0).map_err(|e| e.into()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Stream for Payload {
|
impl Stream for Payload {
|
||||||
type Item = Bytes;
|
type Item = PayloadItem;
|
||||||
type Error = PayloadError;
|
type Error = PayloadError;
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Option<Bytes>, PayloadError> {
|
fn poll(&mut self) -> Poll<Option<PayloadItem>, PayloadError> {
|
||||||
match self.inner.borrow_mut().readany()? {
|
self.inner.borrow_mut().readany()
|
||||||
Async::Ready(Some(item)) => Ok(Async::Ready(Some(item.0))),
|
|
||||||
Async::Ready(None) => Ok(Async::Ready(None)),
|
|
||||||
Async::NotReady => Ok(Async::NotReady),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -474,7 +464,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_basic() {
|
fn test_basic() {
|
||||||
Core::new().unwrap().run(lazy(|| {
|
Core::new().unwrap().run(lazy(|| {
|
||||||
let (_, mut payload) = Payload::new(false);
|
let (_, payload) = Payload::new(false);
|
||||||
|
|
||||||
assert!(!payload.eof());
|
assert!(!payload.eof());
|
||||||
assert!(payload.is_empty());
|
assert!(payload.is_empty());
|
||||||
@ -489,7 +479,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_eof() {
|
fn test_eof() {
|
||||||
Core::new().unwrap().run(lazy(|| {
|
Core::new().unwrap().run(lazy(|| {
|
||||||
let (mut sender, mut payload) = Payload::new(false);
|
let (mut sender, payload) = Payload::new(false);
|
||||||
|
|
||||||
assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap());
|
assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap());
|
||||||
assert!(!payload.eof());
|
assert!(!payload.eof());
|
||||||
@ -514,7 +504,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_err() {
|
fn test_err() {
|
||||||
Core::new().unwrap().run(lazy(|| {
|
Core::new().unwrap().run(lazy(|| {
|
||||||
let (mut sender, mut payload) = Payload::new(false);
|
let (mut sender, payload) = Payload::new(false);
|
||||||
|
|
||||||
assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap());
|
assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap());
|
||||||
|
|
||||||
@ -528,7 +518,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_readany() {
|
fn test_readany() {
|
||||||
Core::new().unwrap().run(lazy(|| {
|
Core::new().unwrap().run(lazy(|| {
|
||||||
let (mut sender, mut payload) = Payload::new(false);
|
let (mut sender, payload) = Payload::new(false);
|
||||||
|
|
||||||
sender.feed_data(Bytes::from("line1"));
|
sender.feed_data(Bytes::from("line1"));
|
||||||
|
|
||||||
@ -552,7 +542,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_readexactly() {
|
fn test_readexactly() {
|
||||||
Core::new().unwrap().run(lazy(|| {
|
Core::new().unwrap().run(lazy(|| {
|
||||||
let (mut sender, mut payload) = Payload::new(false);
|
let (mut sender, payload) = Payload::new(false);
|
||||||
|
|
||||||
assert_eq!(Async::NotReady, payload.readexactly(2).poll().ok().unwrap());
|
assert_eq!(Async::NotReady, payload.readexactly(2).poll().ok().unwrap());
|
||||||
|
|
||||||
@ -579,7 +569,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_readuntil() {
|
fn test_readuntil() {
|
||||||
Core::new().unwrap().run(lazy(|| {
|
Core::new().unwrap().run(lazy(|| {
|
||||||
let (mut sender, mut payload) = Payload::new(false);
|
let (mut sender, payload) = Payload::new(false);
|
||||||
|
|
||||||
assert_eq!(Async::NotReady, payload.readuntil(b"ne").poll().ok().unwrap());
|
assert_eq!(Async::NotReady, payload.readuntil(b"ne").poll().ok().unwrap());
|
||||||
|
|
||||||
|
@ -8,8 +8,8 @@ use futures::unsync::oneshot;
|
|||||||
|
|
||||||
use channel::HttpHandlerTask;
|
use channel::HttpHandlerTask;
|
||||||
use body::{Body, BodyStream};
|
use body::{Body, BodyStream};
|
||||||
use context::{Frame, IoContext};
|
use context::{Frame, ActorHttpContext};
|
||||||
use error::{Error, UnexpectedTaskFrame};
|
use error::Error;
|
||||||
use handler::{Reply, ReplyItem};
|
use handler::{Reply, ReplyItem};
|
||||||
use h1writer::{Writer, WriterState};
|
use h1writer::{Writer, WriterState};
|
||||||
use httprequest::HttpRequest;
|
use httprequest::HttpRequest;
|
||||||
@ -38,7 +38,7 @@ struct PipelineInfo<S> {
|
|||||||
req: HttpRequest<S>,
|
req: HttpRequest<S>,
|
||||||
count: usize,
|
count: usize,
|
||||||
mws: Rc<Vec<Box<Middleware<S>>>>,
|
mws: Rc<Vec<Box<Middleware<S>>>>,
|
||||||
context: Option<Box<IoContext>>,
|
context: Option<Box<ActorHttpContext>>,
|
||||||
error: Option<Error>,
|
error: Option<Error>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -72,12 +72,6 @@ impl<S> PipelineInfo<S> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
enum PipelineResponse {
|
|
||||||
None,
|
|
||||||
Context(Box<IoContext>),
|
|
||||||
Response(Box<Future<Item=HttpResponse, Error=Error>>),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S, H: PipelineHandler<S>> Pipeline<S, H> {
|
impl<S, H: PipelineHandler<S>> Pipeline<S, H> {
|
||||||
|
|
||||||
pub fn new(req: HttpRequest<S>,
|
pub fn new(req: HttpRequest<S>,
|
||||||
@ -364,7 +358,7 @@ impl<S, H: PipelineHandler<S>> StartMiddlewares<S, H> {
|
|||||||
|
|
||||||
// waiting for response
|
// waiting for response
|
||||||
struct WaitingResponse<S, H> {
|
struct WaitingResponse<S, H> {
|
||||||
stream: PipelineResponse,
|
fut: Box<Future<Item=HttpResponse, Error=Error>>,
|
||||||
_s: PhantomData<S>,
|
_s: PhantomData<S>,
|
||||||
_h: PhantomData<H>,
|
_h: PhantomData<H>,
|
||||||
}
|
}
|
||||||
@ -377,66 +371,23 @@ impl<S, H> WaitingResponse<S, H> {
|
|||||||
match reply.into() {
|
match reply.into() {
|
||||||
ReplyItem::Message(resp) =>
|
ReplyItem::Message(resp) =>
|
||||||
RunMiddlewares::init(info, resp),
|
RunMiddlewares::init(info, resp),
|
||||||
ReplyItem::Actor(ctx) =>
|
|
||||||
PipelineState::Handler(
|
|
||||||
WaitingResponse { stream: PipelineResponse::Context(ctx),
|
|
||||||
_s: PhantomData, _h: PhantomData }),
|
|
||||||
ReplyItem::Future(fut) =>
|
ReplyItem::Future(fut) =>
|
||||||
PipelineState::Handler(
|
PipelineState::Handler(
|
||||||
WaitingResponse { stream: PipelineResponse::Response(fut),
|
WaitingResponse { fut: fut, _s: PhantomData, _h: PhantomData }),
|
||||||
_s: PhantomData, _h: PhantomData }),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S, H>, PipelineState<S, H>>
|
fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S, H>, PipelineState<S, H>>
|
||||||
{
|
{
|
||||||
let stream = mem::replace(&mut self.stream, PipelineResponse::None);
|
match self.fut.poll() {
|
||||||
|
Ok(Async::NotReady) =>
|
||||||
match stream {
|
Err(PipelineState::Handler(self)),
|
||||||
PipelineResponse::Context(mut context) => {
|
|
||||||
loop {
|
|
||||||
match context.poll() {
|
|
||||||
Ok(Async::Ready(Some(frame))) => {
|
|
||||||
match frame {
|
|
||||||
Frame::Message(resp) => {
|
|
||||||
info.context = Some(context);
|
|
||||||
return Ok(RunMiddlewares::init(info, resp))
|
|
||||||
}
|
|
||||||
Frame::Payload(_) | Frame::Drain(_) => (),
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Ok(Async::Ready(None)) => {
|
|
||||||
error!("Unexpected eof");
|
|
||||||
let err: Error = UnexpectedTaskFrame.into();
|
|
||||||
return Ok(ProcessResponse::init(err.into()))
|
|
||||||
},
|
|
||||||
Ok(Async::NotReady) => {
|
|
||||||
self.stream = PipelineResponse::Context(context);
|
|
||||||
return Err(PipelineState::Handler(self))
|
|
||||||
},
|
|
||||||
Err(err) =>
|
|
||||||
return Ok(ProcessResponse::init(err.into()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
PipelineResponse::Response(mut fut) => {
|
|
||||||
match fut.poll() {
|
|
||||||
Ok(Async::NotReady) => {
|
|
||||||
self.stream = PipelineResponse::Response(fut);
|
|
||||||
Err(PipelineState::Handler(self))
|
|
||||||
}
|
|
||||||
Ok(Async::Ready(response)) =>
|
Ok(Async::Ready(response)) =>
|
||||||
Ok(RunMiddlewares::init(info, response)),
|
Ok(RunMiddlewares::init(info, response)),
|
||||||
Err(err) =>
|
Err(err) =>
|
||||||
Ok(ProcessResponse::init(err.into())),
|
Ok(ProcessResponse::init(err.into())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
PipelineResponse::None => {
|
|
||||||
unreachable!("Broken internal state")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Middlewares response executor
|
/// Middlewares response executor
|
||||||
@ -554,7 +505,7 @@ impl RunningState {
|
|||||||
enum IOState {
|
enum IOState {
|
||||||
Response,
|
Response,
|
||||||
Payload(BodyStream),
|
Payload(BodyStream),
|
||||||
Context,
|
Actor(Box<ActorHttpContext>),
|
||||||
Done,
|
Done,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -588,10 +539,10 @@ impl<S, H> ProcessResponse<S, H> {
|
|||||||
};
|
};
|
||||||
|
|
||||||
match self.resp.replace_body(Body::Empty) {
|
match self.resp.replace_body(Body::Empty) {
|
||||||
Body::Streaming(stream) | Body::Upgrade(stream) =>
|
Body::Streaming(stream) =>
|
||||||
self.iostate = IOState::Payload(stream),
|
self.iostate = IOState::Payload(stream),
|
||||||
Body::StreamingContext | Body::UpgradeContext =>
|
Body::Actor(ctx) =>
|
||||||
self.iostate = IOState::Context,
|
self.iostate = IOState::Actor(ctx),
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -640,17 +591,12 @@ impl<S, H> ProcessResponse<S, H> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
IOState::Context => {
|
IOState::Actor(mut ctx) => {
|
||||||
match info.context.as_mut().unwrap().poll() {
|
match ctx.poll() {
|
||||||
Ok(Async::Ready(Some(frame))) => {
|
Ok(Async::Ready(Some(frame))) => {
|
||||||
match frame {
|
match frame {
|
||||||
Frame::Message(msg) => {
|
|
||||||
error!("Unexpected message frame {:?}", msg);
|
|
||||||
info.error = Some(UnexpectedTaskFrame.into());
|
|
||||||
return Ok(
|
|
||||||
FinishingMiddlewares::init(info, self.resp))
|
|
||||||
},
|
|
||||||
Frame::Payload(None) => {
|
Frame::Payload(None) => {
|
||||||
|
info.context = Some(ctx);
|
||||||
self.iostate = IOState::Done;
|
self.iostate = IOState::Done;
|
||||||
if let Err(err) = io.write_eof() {
|
if let Err(err) = io.write_eof() {
|
||||||
info.error = Some(err.into());
|
info.error = Some(err.into());
|
||||||
@ -660,7 +606,7 @@ impl<S, H> ProcessResponse<S, H> {
|
|||||||
break
|
break
|
||||||
},
|
},
|
||||||
Frame::Payload(Some(chunk)) => {
|
Frame::Payload(Some(chunk)) => {
|
||||||
self.iostate = IOState::Context;
|
self.iostate = IOState::Actor(ctx);
|
||||||
match io.write(chunk.as_ref()) {
|
match io.write(chunk.as_ref()) {
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
info.error = Some(err.into());
|
info.error = Some(err.into());
|
||||||
@ -678,11 +624,10 @@ impl<S, H> ProcessResponse<S, H> {
|
|||||||
},
|
},
|
||||||
Ok(Async::Ready(None)) => {
|
Ok(Async::Ready(None)) => {
|
||||||
self.iostate = IOState::Done;
|
self.iostate = IOState::Done;
|
||||||
info.context.take();
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
Ok(Async::NotReady) => {
|
Ok(Async::NotReady) => {
|
||||||
self.iostate = IOState::Context;
|
self.iostate = IOState::Actor(ctx);
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
@ -190,7 +190,7 @@ impl Pattern {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Extract pattern parameters from the text
|
/// Extract pattern parameters from the text
|
||||||
pub(crate) fn update_match_info<S>(&self, text: &str, req: &mut HttpRequest<S>) {
|
pub fn update_match_info<S>(&self, text: &str, req: &mut HttpRequest<S>) {
|
||||||
if !self.names.is_empty() {
|
if !self.names.is_empty() {
|
||||||
if let Some(captures) = self.re.captures(text) {
|
if let Some(captures) = self.re.captures(text) {
|
||||||
let mut idx = 0;
|
let mut idx = 0;
|
||||||
@ -208,8 +208,7 @@ impl Pattern {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Build pattern path.
|
/// Build pattern path.
|
||||||
pub fn path<U, I>(&self, prefix: Option<&str>, elements: U)
|
pub fn path<U, I>(&self, prefix: Option<&str>, elements: U) -> Result<String, UrlGenerationError>
|
||||||
-> Result<String, UrlGenerationError>
|
|
||||||
where U: IntoIterator<Item=I>,
|
where U: IntoIterator<Item=I>,
|
||||||
I: AsRef<str>,
|
I: AsRef<str>,
|
||||||
{
|
{
|
||||||
|
@ -41,7 +41,7 @@ use httpresponse::HttpResponse;
|
|||||||
/// # extern crate reqwest;
|
/// # extern crate reqwest;
|
||||||
/// #
|
/// #
|
||||||
/// # fn my_handler(req: HttpRequest) -> HttpResponse {
|
/// # fn my_handler(req: HttpRequest) -> HttpResponse {
|
||||||
/// # httpcodes::HTTPOk.response()
|
/// # httpcodes::HTTPOk.into()
|
||||||
/// # }
|
/// # }
|
||||||
/// #
|
/// #
|
||||||
/// # fn main() {
|
/// # fn main() {
|
||||||
@ -228,9 +228,9 @@ impl<S: 'static> Iterator for TestApp<S> {
|
|||||||
///
|
///
|
||||||
/// fn index(req: HttpRequest) -> HttpResponse {
|
/// fn index(req: HttpRequest) -> HttpResponse {
|
||||||
/// if let Some(hdr) = req.headers().get(header::CONTENT_TYPE) {
|
/// if let Some(hdr) = req.headers().get(header::CONTENT_TYPE) {
|
||||||
/// httpcodes::HTTPOk.response()
|
/// httpcodes::HTTPOk.into()
|
||||||
/// } else {
|
/// } else {
|
||||||
/// httpcodes::HTTPBadRequest.response()
|
/// httpcodes::HTTPBadRequest.into()
|
||||||
/// }
|
/// }
|
||||||
/// }
|
/// }
|
||||||
///
|
///
|
||||||
@ -365,7 +365,6 @@ impl<S> TestRequest<S> {
|
|||||||
Ok(resp) => {
|
Ok(resp) => {
|
||||||
match resp.into().into() {
|
match resp.into().into() {
|
||||||
ReplyItem::Message(resp) => Ok(resp),
|
ReplyItem::Message(resp) => Ok(resp),
|
||||||
ReplyItem::Actor(_) => panic!("Actor handler is not supported."),
|
|
||||||
ReplyItem::Future(_) => panic!("Async handler is not supported."),
|
ReplyItem::Future(_) => panic!("Async handler is not supported."),
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
26
src/ws.rs
26
src/ws.rs
@ -12,7 +12,7 @@
|
|||||||
//! use actix_web::*;
|
//! use actix_web::*;
|
||||||
//!
|
//!
|
||||||
//! // do websocket handshake and start actor
|
//! // do websocket handshake and start actor
|
||||||
//! fn ws_index(req: HttpRequest) -> Result<Reply> {
|
//! fn ws_index(req: HttpRequest) -> Result<HttpResponse> {
|
||||||
//! ws::start(req, Ws)
|
//! ws::start(req, Ws)
|
||||||
//! }
|
//! }
|
||||||
//!
|
//!
|
||||||
@ -52,13 +52,11 @@ use futures::{Async, Poll, Stream};
|
|||||||
|
|
||||||
use actix::{Actor, AsyncContext, ResponseType, StreamHandler};
|
use actix::{Actor, AsyncContext, ResponseType, StreamHandler};
|
||||||
|
|
||||||
use body::Body;
|
|
||||||
use context::HttpContext;
|
|
||||||
use handler::Reply;
|
|
||||||
use payload::ReadAny;
|
use payload::ReadAny;
|
||||||
use error::{Error, WsHandshakeError};
|
use error::{Error, WsHandshakeError};
|
||||||
|
use context::HttpContext;
|
||||||
use httprequest::HttpRequest;
|
use httprequest::HttpRequest;
|
||||||
use httpresponse::{ConnectionType, HttpResponse};
|
use httpresponse::{ConnectionType, HttpResponse, HttpResponseBuilder};
|
||||||
|
|
||||||
use wsframe;
|
use wsframe;
|
||||||
use wsproto::*;
|
use wsproto::*;
|
||||||
@ -88,17 +86,17 @@ impl ResponseType for Message {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Do websocket handshake and start actor
|
/// Do websocket handshake and start actor
|
||||||
pub fn start<A, S>(mut req: HttpRequest<S>, actor: A) -> Result<Reply, Error>
|
pub fn start<A, S>(mut req: HttpRequest<S>, actor: A) -> Result<HttpResponse, Error>
|
||||||
where A: Actor<Context=HttpContext<A, S>> + StreamHandler<Message>,
|
where A: Actor<Context=HttpContext<A, S>> + StreamHandler<Message>,
|
||||||
S: 'static
|
S: 'static
|
||||||
{
|
{
|
||||||
let resp = handshake(&req)?;
|
let mut resp = handshake(&req)?;
|
||||||
|
|
||||||
let stream = WsStream::new(req.payload_mut().readany());
|
let stream = WsStream::new(req.payload_mut().readany());
|
||||||
|
|
||||||
let mut ctx = HttpContext::new(req, actor);
|
let mut ctx = HttpContext::new(req, actor);
|
||||||
ctx.start(resp);
|
|
||||||
ctx.add_stream(stream);
|
ctx.add_stream(stream);
|
||||||
Ok(ctx.into())
|
|
||||||
|
Ok(resp.body(ctx)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Prepare `WebSocket` handshake response.
|
/// Prepare `WebSocket` handshake response.
|
||||||
@ -109,7 +107,7 @@ pub fn start<A, S>(mut req: HttpRequest<S>, actor: A) -> Result<Reply, Error>
|
|||||||
// /// `protocols` is a sequence of known protocols. On successful handshake,
|
// /// `protocols` is a sequence of known protocols. On successful handshake,
|
||||||
// /// the returned response headers contain the first protocol in this list
|
// /// the returned response headers contain the first protocol in this list
|
||||||
// /// which the server also knows.
|
// /// which the server also knows.
|
||||||
pub fn handshake<S>(req: &HttpRequest<S>) -> Result<HttpResponse, WsHandshakeError> {
|
pub fn handshake<S>(req: &HttpRequest<S>) -> Result<HttpResponseBuilder, WsHandshakeError> {
|
||||||
// WebSocket accepts only GET
|
// WebSocket accepts only GET
|
||||||
if *req.method() != Method::GET {
|
if *req.method() != Method::GET {
|
||||||
return Err(WsHandshakeError::GetMethodRequired)
|
return Err(WsHandshakeError::GetMethodRequired)
|
||||||
@ -163,8 +161,7 @@ pub fn handshake<S>(req: &HttpRequest<S>) -> Result<HttpResponse, WsHandshakeErr
|
|||||||
.header(header::UPGRADE, "websocket")
|
.header(header::UPGRADE, "websocket")
|
||||||
.header(header::TRANSFER_ENCODING, "chunked")
|
.header(header::TRANSFER_ENCODING, "chunked")
|
||||||
.header(SEC_WEBSOCKET_ACCEPT, key.as_str())
|
.header(SEC_WEBSOCKET_ACCEPT, key.as_str())
|
||||||
.body(Body::UpgradeContext).unwrap()
|
.take())
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Maps `Payload` stream into stream of `ws::Message` items
|
/// Maps `Payload` stream into stream of `ws::Message` items
|
||||||
@ -401,6 +398,7 @@ mod tests {
|
|||||||
header::HeaderValue::from_static("13"));
|
header::HeaderValue::from_static("13"));
|
||||||
let req = HttpRequest::new(Method::GET, Uri::from_str("/").unwrap(),
|
let req = HttpRequest::new(Method::GET, Uri::from_str("/").unwrap(),
|
||||||
Version::HTTP_11, headers, None);
|
Version::HTTP_11, headers, None);
|
||||||
assert_eq!(StatusCode::SWITCHING_PROTOCOLS, handshake(&req).unwrap().status());
|
assert_eq!(StatusCode::SWITCHING_PROTOCOLS,
|
||||||
|
handshake(&req).unwrap().finish().unwrap().status());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user