1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-27 17:22:57 +01:00

refactor http actor handling

This commit is contained in:
Nikolay Kim 2017-11-29 10:31:24 -08:00
parent 6177d86d97
commit 6f833798c7
11 changed files with 131 additions and 211 deletions

View File

@ -80,35 +80,10 @@ impl Actor for MyWebSocket {
type Context = HttpContext<Self>;
}
/// Http route handler
impl Route for MyWebSocket {
type State = ();
fn request(mut req: HttpRequest, mut ctx: HttpContext<Self>) -> Result<Reply>
{
// websocket handshake
let resp = ws::handshake(&req)?;
// send HttpResponse back to peer
ctx.start(resp);
// convert bytes stream to a stream of `ws::Message` and handle stream
ctx.add_stream(ws::WsStream::new(&mut req));
ctx.reply(MyWebSocket)
}
}
/// Standard actix's stream handler for a stream of `ws::Message`
impl StreamHandler<ws::Message> for MyWebSocket {
fn started(&mut self, ctx: &mut Self::Context) {
println!("WebSocket session openned");
}
fn finished(&mut self, ctx: &mut Self::Context) {
println!("WebSocket session closed");
}
}
impl StreamHandler<ws::Message> for MyWebSocket {}
impl Handler<ws::Message> for MyWebSocket {
fn handle(&mut self, msg: ws::Message, ctx: &mut HttpContext<Self>)
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context)
-> Response<Self, ws::Message>
{
// process websocket messages
@ -136,7 +111,7 @@ fn main() {
// enable logger
.middleware(middlewares::Logger::default())
// websocket route
.resource("/ws/", |r| r.get::<MyWebSocket>())
.resource("/ws/", |r| r.get(|req| ws::start(req, MyWebSocket)))
.route_handler("/", StaticFiles::new("examples/static/", true)))
.serve::<_, ()>("127.0.0.1:8080").unwrap();

View File

@ -19,6 +19,7 @@ struct AppState {
fn index(req: HttpRequest<AppState>) -> HttpResponse {
println!("{:?}", req);
req.state().counter.set(req.state().counter.get() + 1);
httpcodes::HTTPOk.with_body(
format!("Num of requests: {}", req.state().counter.get()))
}
@ -30,25 +31,12 @@ struct MyWebSocket {
}
impl Actor for MyWebSocket {
type Context = HttpContext<Self>;
}
impl Route for MyWebSocket {
/// Shared application state
type State = AppState;
fn request(mut req: HttpRequest<AppState>, mut ctx: HttpContext<Self>) -> Result<Reply>
{
let resp = ws::handshake(&req)?;
ctx.start(resp);
ctx.add_stream(ws::WsStream::new(&mut req));
ctx.reply(MyWebSocket{counter: 0})
}
type Context = HttpContext<Self, AppState>;
}
impl StreamHandler<ws::Message> for MyWebSocket {}
impl Handler<ws::Message> for MyWebSocket {
fn handle(&mut self, msg: ws::Message, ctx: &mut HttpContext<Self>)
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context)
-> Response<Self, ws::Message>
{
self.counter += 1;
@ -76,7 +64,7 @@ fn main() {
// enable logger
.middleware(middlewares::Logger::default())
// websocket route
.resource("/ws/", |r| r.get::<MyWebSocket>())
.resource("/ws/", |r| r.get(|r| ws::start(r, MyWebSocket{counter: 0})))
// register simple handler, handle all methods
.handler("/", index))
.serve::<_, ()>("127.0.0.1:8080").unwrap();

View File

@ -12,28 +12,19 @@ use actix::*;
use actix_web::*;
/// do websocket handshake and start `MyWebSocket` actor
fn ws_index(r: HttpRequest) -> Reply {
ws::start(r, MyWebSocket).into()
}
/// websocket connection is long running connection, it easier
/// to handle with an actor
struct MyWebSocket;
impl Actor for MyWebSocket {
type Context = HttpContext<Self>;
}
/// Http route handler
impl Route for MyWebSocket {
type State = ();
fn request(mut req: HttpRequest, mut ctx: HttpContext<Self>) -> Result<Reply>
{
// websocket handshake
let resp = ws::handshake(&req)?;
// send HttpResponse back to peer
ctx.start(resp);
// convert bytes stream to a stream of `ws::Message` and register it
ctx.add_stream(ws::WsStream::new(&mut req));
ctx.reply(MyWebSocket)
}
}
/// Standard actix's stream handler for a stream of `ws::Message`
impl StreamHandler<ws::Message> for MyWebSocket {
fn started(&mut self, ctx: &mut Self::Context) {
@ -74,7 +65,7 @@ fn main() {
// enable logger
.middleware(middlewares::Logger::default())
// websocket route
.resource("/ws/", |r| r.get::<MyWebSocket>())
.resource("/ws/", |r| r.get(ws_index))
.route_handler("/", StaticFiles::new("examples/static/", true)))
// start http server on 127.0.0.1:8080
.serve::<_, ()>("127.0.0.1:8080").unwrap();

View File

@ -132,23 +132,10 @@ impl<S> ApplicationBuilder<S> where S: 'static {
/// use actix::*;
/// use actix_web::*;
///
/// struct MyRoute;
///
/// impl Actor for MyRoute {
/// type Context = HttpContext<Self>;
/// }
///
/// impl Route for MyRoute {
/// type State = ();
///
/// fn request(req: HttpRequest, ctx: HttpContext<Self>) -> Result<Reply> {
/// Reply::reply(httpcodes::HTTPOk)
/// }
/// }
/// fn main() {
/// let app = Application::default("/")
/// .resource("/test", |r| {
/// r.get::<MyRoute>();
/// r.get(|req| httpcodes::HTTPOk);
/// r.handler(Method::HEAD, |req| httpcodes::HTTPMethodNotAllowed);
/// })
/// .finish();

View File

@ -14,26 +14,27 @@ use actix::dev::{AsyncContextApi, ActorAddressCell, ActorItemsCell, ActorWaitCel
use task::{IoContext, DrainFut};
use body::Binary;
use error::{Error, Result as ActixResult};
use route::{Route, Frame, Reply};
use error::Error;
use route::Frame;
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
/// Http actor execution context
pub struct HttpContext<A> where A: Actor<Context=HttpContext<A>> + Route,
pub struct HttpContext<A, S=()> where A: Actor<Context=HttpContext<A, S>>,
{
act: Option<A>,
act: A,
state: ActorState,
modified: bool,
items: ActorItemsCell<A>,
address: ActorAddressCell<A>,
stream: VecDeque<Frame>,
wait: ActorWaitCell<A>,
app_state: Rc<<A as Route>::State>,
request: HttpRequest<S>,
disconnected: bool,
}
impl<A> IoContext for HttpContext<A> where A: Actor<Context=Self> + Route {
impl<A, S> IoContext for HttpContext<A, S> where A: Actor<Context=Self>, S: 'static {
fn disconnected(&mut self) {
self.items.stop();
@ -44,7 +45,7 @@ impl<A> IoContext for HttpContext<A> where A: Actor<Context=Self> + Route {
}
}
impl<A> ActorContext for HttpContext<A> where A: Actor<Context=Self> + Route
impl<A, S> ActorContext for HttpContext<A, S> where A: Actor<Context=Self>
{
/// Stop actor execution
fn stop(&mut self) {
@ -69,7 +70,7 @@ impl<A> ActorContext for HttpContext<A> where A: Actor<Context=Self> + Route
}
}
impl<A> AsyncContext<A> for HttpContext<A> where A: Actor<Context=Self> + Route
impl<A, S> AsyncContext<A> for HttpContext<A, S> where A: Actor<Context=Self>
{
fn spawn<F>(&mut self, fut: F) -> SpawnHandle
where F: ActorFuture<Item=(), Error=(), Actor=A> + 'static
@ -96,41 +97,43 @@ impl<A> AsyncContext<A> for HttpContext<A> where A: Actor<Context=Self> + Route
}
#[doc(hidden)]
impl<A> AsyncContextApi<A> for HttpContext<A> where A: Actor<Context=Self> + Route {
impl<A, S> AsyncContextApi<A> for HttpContext<A, S> where A: Actor<Context=Self> {
fn address_cell(&mut self) -> &mut ActorAddressCell<A> {
&mut self.address
}
}
impl<A> HttpContext<A> where A: Actor<Context=Self> + Route {
impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
pub fn new(state: Rc<<A as Route>::State>) -> HttpContext<A>
pub fn new(req: HttpRequest<S>, actor: A) -> HttpContext<A, S>
{
HttpContext {
act: None,
act: actor,
state: ActorState::Started,
modified: false,
items: ActorItemsCell::default(),
address: ActorAddressCell::default(),
wait: ActorWaitCell::default(),
stream: VecDeque::new(),
app_state: state,
request: req,
disconnected: false,
}
}
pub(crate) fn set_actor(&mut self, act: A) {
self.act = Some(act)
}
}
impl<A> HttpContext<A> where A: Actor<Context=Self> + Route {
impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
/// Shared application state
pub fn state(&self) -> &<A as Route>::State {
&self.app_state
pub fn state(&self) -> &S {
self.request.state()
}
/// Incoming request
pub fn request(&mut self) -> &mut HttpRequest<S> {
&mut self.request
}
/// Start response processing
pub fn start<R: Into<HttpResponse>>(&mut self, response: R) {
self.stream.push_back(Frame::Message(response.into()))
@ -158,14 +161,9 @@ impl<A> HttpContext<A> where A: Actor<Context=Self> + Route {
pub fn connected(&self) -> bool {
!self.disconnected
}
pub fn reply(mut self, actor: A) -> ActixResult<Reply> {
self.set_actor(actor);
Reply::async(self)
}
}
impl<A> HttpContext<A> where A: Actor<Context=Self> + Route {
impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
#[doc(hidden)]
pub fn subscriber<M>(&mut self) -> Box<Subscriber<M>>
@ -187,20 +185,17 @@ impl<A> HttpContext<A> where A: Actor<Context=Self> + Route {
}
#[doc(hidden)]
impl<A> Stream for HttpContext<A> where A: Actor<Context=Self> + Route
impl<A, S> Stream for HttpContext<A, S> where A: Actor<Context=Self>
{
type Item = Frame;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Frame>, Error> {
if self.act.is_none() {
return Ok(Async::NotReady)
}
let act: &mut A = unsafe {
std::mem::transmute(self.act.as_mut().unwrap() as &mut A)
std::mem::transmute(&mut self.act as &mut A)
};
let ctx: &mut HttpContext<A> = unsafe {
std::mem::transmute(self as &mut HttpContext<A>)
let ctx: &mut HttpContext<A, S> = unsafe {
std::mem::transmute(self as &mut HttpContext<A, S>)
};
// update state
@ -283,8 +278,8 @@ impl<A> Stream for HttpContext<A> where A: Actor<Context=Self> + Route
}
}
impl<A> ToEnvelope<A> for HttpContext<A>
where A: Actor<Context=HttpContext<A>> + Route,
impl<A, S> ToEnvelope<A> for HttpContext<A, S>
where A: Actor<Context=HttpContext<A, S>>,
{
fn pack<M>(msg: M, tx: Option<Sender<Result<M::Item, M::Error>>>) -> Envelope<A>
where A: Handler<M>,

View File

@ -11,7 +11,6 @@
// dev specific
pub use task::Task;
pub use pipeline::Pipeline;
pub use route::RouteFactory;
pub use recognizer::RouteRecognizer;
pub use channel::HttpChannel;

View File

@ -95,11 +95,6 @@ impl<S> HttpRequest<S> {
&self.1
}
/// Clone application state
pub(crate) fn clone_state(&self) -> Rc<S> {
Rc::clone(&self.1)
}
/// Protocol extensions.
#[inline]
pub fn extensions(&mut self) -> &mut Extensions {

View File

@ -82,7 +82,7 @@ pub use application::Application;
pub use httprequest::{HttpRequest, UrlEncoded};
pub use httpresponse::HttpResponse;
pub use payload::{Payload, PayloadItem};
pub use route::{Frame, Route, RouteFactory, RouteHandler, Reply};
pub use route::{Frame, RouteHandler, Reply};
pub use resource::Resource;
pub use recognizer::Params;
pub use server::HttpServer;

View File

@ -1,14 +1,12 @@
use std::marker::PhantomData;
use std::collections::HashMap;
use actix::Actor;
use http::Method;
use futures::Stream;
use task::Task;
use error::Error;
use route::{Reply, Route, RouteHandler, Frame, FnHandler, StreamHandler};
use context::HttpContext;
use route::{Reply, RouteHandler, Frame, FnHandler, StreamHandler};
use httprequest::HttpRequest;
use httpcodes::{HTTPNotFound, HTTPMethodNotAllowed};
@ -92,31 +90,31 @@ impl<S> Resource<S> where S: 'static {
}
/// Handler for `GET` method.
pub fn get<A>(&mut self)
where A: Actor<Context=HttpContext<A>> + Route<State=S>
{
self.route_handler(Method::GET, A::factory());
pub fn get<F, R>(&mut self, handler: F)
where F: Fn(HttpRequest<S>) -> R + 'static,
R: Into<Reply> + 'static, {
self.routes.insert(Method::GET, Box::new(FnHandler::new(handler)));
}
/// Handler for `POST` method.
pub fn post<A>(&mut self)
where A: Actor<Context=HttpContext<A>> + Route<State=S>
{
self.route_handler(Method::POST, A::factory());
pub fn post<F, R>(&mut self, handler: F)
where F: Fn(HttpRequest<S>) -> R + 'static,
R: Into<Reply> + 'static, {
self.routes.insert(Method::POST, Box::new(FnHandler::new(handler)));
}
/// Handler for `PUR` method.
pub fn put<A>(&mut self)
where A: Actor<Context=HttpContext<A>> + Route<State=S>
{
self.route_handler(Method::PUT, A::factory());
/// Handler for `PUT` method.
pub fn put<F, R>(&mut self, handler: F)
where F: Fn(HttpRequest<S>) -> R + 'static,
R: Into<Reply> + 'static, {
self.routes.insert(Method::PUT, Box::new(FnHandler::new(handler)));
}
/// Handler for `METHOD` method.
pub fn delete<A>(&mut self)
where A: Actor<Context=HttpContext<A>> + Route<State=S>
{
self.route_handler(Method::DELETE, A::factory());
/// Handler for `DELETE` method.
pub fn delete<F, R>(&mut self, handler: F)
where F: Fn(HttpRequest<S>) -> R + 'static,
R: Into<Reply> + 'static, {
self.routes.insert(Method::DELETE, Box::new(FnHandler::new(handler)));
}
}

View File

@ -1,14 +1,15 @@
use std::rc::Rc;
use std::cell::RefCell;
use std::marker::PhantomData;
use std::result::Result as StdResult;
use actix::Actor;
use http::{header, Version};
// use http::{header, Version};
use futures::Stream;
use task::{Task, DrainFut, IoContext};
use body::Binary;
use error::{Error, ExpectError, Result};
use error::{Error}; //, ExpectError, Result};
use context::HttpContext;
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
@ -37,9 +38,10 @@ pub trait RouteHandler<S>: 'static {
fn set_prefix(&mut self, prefix: String) {}
}
/*
/// Actors with ability to handle http requests.
#[allow(unused_variables)]
pub trait Route: Actor {
pub trait RouteState {
/// Shared state. State is shared with all routes within same application
/// and could be accessed with `HttpRequest::state()` method.
type State;
@ -69,42 +71,13 @@ pub trait Route: Actor {
}
}
/// Handle incoming request. Route actor can return
/// result immediately with `Reply::reply`.
/// Actor itself can be returned with `Reply::stream` for handling streaming
/// request/response or websocket connection.
/// In that case `HttpContext::start` and `HttpContext::write` has to be used
/// for writing response.
fn request(req: HttpRequest<Self::State>, ctx: Self::Context) -> Result<Reply>;
/// This method creates `RouteFactory` for this actor.
fn factory() -> RouteFactory<Self, Self::State> {
RouteFactory(PhantomData)
/// Handle incoming request with http actor.
fn handle(req: HttpRequest<Self::State>) -> Result<Reply>
where Self: Default, Self: Actor<Context=HttpContext<Self>>
{
Ok(HttpContext::new(req, Self::default()).into())
}
}
/// This is used for routes registration within `Resource`
pub struct RouteFactory<A: Route<State=S>, S>(PhantomData<A>);
impl<A, S> RouteHandler<S> for RouteFactory<A, S>
where A: Actor<Context=HttpContext<A>> + Route<State=S>,
S: 'static
{
fn handle(&self, mut req: HttpRequest<A::State>, task: &mut Task) {
let mut ctx = HttpContext::new(req.clone_state());
// handle EXPECT header
if req.headers().contains_key(header::EXPECT) {
if let Err(resp) = A::expect(&mut req, &mut ctx) {
task.reply(resp)
}
}
match A::request(req, ctx) {
Ok(reply) => reply.into(task),
Err(err) => task.reply(err),
}
}
}
}*/
/// Fn() route handler
pub(crate)
@ -180,20 +153,22 @@ pub struct Reply(ReplyItem);
impl Reply
{
/// Create actor response
pub(crate) fn async<C: IoContext>(ctx: C) -> Result<Reply> {
Ok(Reply(ReplyItem::Actor(Box::new(ctx))))
pub fn actor<A, S>(ctx: HttpContext<A, S>) -> Reply
where A: Actor<Context=HttpContext<A, S>>, S: 'static
{
Reply(ReplyItem::Actor(Box::new(ctx)))
}
/// Create async response
pub fn stream<S>(stream: S) -> Result<Reply>
pub fn stream<S>(stream: S) -> Reply
where S: Stream<Item=Frame, Error=Error> + 'static
{
Ok(Reply(ReplyItem::Stream(Box::new(stream))))
Reply(ReplyItem::Stream(Box::new(stream)))
}
/// Send response
pub fn reply<R: Into<HttpResponse>>(response: R) -> Result<Reply> {
Ok(Reply(ReplyItem::Message(response.into())))
pub fn reply<R: Into<HttpResponse>>(response: R) -> Reply {
Reply(ReplyItem::Message(response.into()))
}
pub fn into(self, task: &mut Task)
@ -218,3 +193,19 @@ impl<T: Into<HttpResponse>> From<T> for Reply
Reply(ReplyItem::Message(item.into()))
}
}
impl<E: Into<Error>> From<StdResult<Reply, E>> for Reply {
fn from(res: StdResult<Reply, E>) -> Self {
match res {
Ok(val) => val,
Err(err) => err.into().into(),
}
}
}
impl<A: Actor<Context=HttpContext<A, S>>, S: 'static> From<HttpContext<A, S>> for Reply
{
fn from(item: HttpContext<A, S>) -> Self {
Reply(ReplyItem::Actor(Box::new(item)))
}
}

View File

@ -12,6 +12,10 @@
//! use actix::*;
//! use actix_web::*;
//!
//! fn ws_index(req: HttpRequest) -> Result<Reply> {
//! ws::start(req, WsRoute)
//! }
//!
//! // WebSocket Route
//! struct WsRoute;
//!
@ -19,22 +23,6 @@
//! type Context = HttpContext<Self>;
//! }
//!
//! impl Route for WsRoute {
//! type State = ();
//!
//! fn request(mut req: HttpRequest, mut ctx: HttpContext<Self>) -> Result<Reply>
//! {
//! // WebSocket handshake
//! let resp = ws::handshake(&req)?;
//! // Send handshake response to peer
//! ctx.start(resp);
//! // Map Payload into WsStream
//! ctx.add_stream(ws::WsStream::new(&mut req));
//! // Start ws messages processing
//! ctx.reply(WsRoute)
//! }
//! }
//!
//! // Define Handler for ws::Message message
//! impl StreamHandler<ws::Message> for WsRoute {}
//!
@ -59,13 +47,13 @@ use http::{Method, StatusCode, header};
use bytes::BytesMut;
use futures::{Async, Poll, Stream};
use actix::{Actor, ResponseType};
use actix::{Actor, AsyncContext, ResponseType, StreamHandler};
use body::Body;
use context::HttpContext;
use route::Route;
use route::Reply;
use payload::Payload;
use error::WsHandshakeError;
use error::{Error, WsHandshakeError};
use httprequest::HttpRequest;
use httpresponse::{ConnectionType, HttpResponse};
@ -100,6 +88,19 @@ impl ResponseType for Message {
type Error = ();
}
pub fn start<A, S>(mut req: HttpRequest<S>, actor: A) -> Result<Reply, Error>
where A: Actor<Context=HttpContext<A, S>> + StreamHandler<Message>,
S: 'static
{
let resp = handshake(&req)?;
let stream = WsStream::new(&mut req);
let mut ctx = HttpContext::new(req, actor);
ctx.start(resp);
ctx.add_stream(stream);
Ok(ctx.into())
}
/// Prepare `WebSocket` handshake response.
///
/// This function returns handshake `HttpResponse`, ready to send to peer.
@ -271,8 +272,8 @@ pub struct WsWriter;
impl WsWriter {
/// Send text frame
pub fn text<A>(ctx: &mut HttpContext<A>, text: &str)
where A: Actor<Context=HttpContext<A>> + Route
pub fn text<A, S>(ctx: &mut HttpContext<A, S>, text: &str)
where A: Actor<Context=HttpContext<A, S>>
{
let mut frame = wsframe::Frame::message(Vec::from(text), OpCode::Text, true);
let mut buf = Vec::new();
@ -282,8 +283,8 @@ impl WsWriter {
}
/// Send binary frame
pub fn binary<A>(ctx: &mut HttpContext<A>, data: Vec<u8>)
where A: Actor<Context=HttpContext<A>> + Route
pub fn binary<A, S>(ctx: &mut HttpContext<A, S>, data: Vec<u8>)
where A: Actor<Context=HttpContext<A, S>>
{
let mut frame = wsframe::Frame::message(data, OpCode::Binary, true);
let mut buf = Vec::new();
@ -293,8 +294,8 @@ impl WsWriter {
}
/// Send ping frame
pub fn ping<A>(ctx: &mut HttpContext<A>, message: &str)
where A: Actor<Context=HttpContext<A>> + Route
pub fn ping<A, S>(ctx: &mut HttpContext<A, S>, message: &str)
where A: Actor<Context=HttpContext<A, S>>
{
let mut frame = wsframe::Frame::message(Vec::from(message), OpCode::Ping, true);
let mut buf = Vec::new();
@ -304,8 +305,8 @@ impl WsWriter {
}
/// Send pong frame
pub fn pong<A>(ctx: &mut HttpContext<A>, message: &str)
where A: Actor<Context=HttpContext<A>> + Route
pub fn pong<A, S>(ctx: &mut HttpContext<A, S>, message: &str)
where A: Actor<Context=HttpContext<A, S>>
{
let mut frame = wsframe::Frame::message(Vec::from(message), OpCode::Pong, true);
let mut buf = Vec::new();
@ -315,8 +316,8 @@ impl WsWriter {
}
/// Send close frame
pub fn close<A>(ctx: &mut HttpContext<A>, code: CloseCode, reason: &str)
where A: Actor<Context=HttpContext<A>> + Route
pub fn close<A, S>(ctx: &mut HttpContext<A, S>, code: CloseCode, reason: &str)
where A: Actor<Context=HttpContext<A, S>>
{
let mut frame = wsframe::Frame::close(code, reason);
let mut buf = Vec::new();