From 6f833798c7b3b40021f63d6447317855f824ab23 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 29 Nov 2017 10:31:24 -0800 Subject: [PATCH] refactor http actor handling --- README.md | 31 ++--------------- examples/state.rs | 20 +++-------- examples/websocket.rs | 25 +++++--------- src/application.rs | 15 +------- src/context.rs | 65 ++++++++++++++++------------------- src/dev.rs | 1 - src/httprequest.rs | 5 --- src/lib.rs | 2 +- src/resource.rs | 40 +++++++++++----------- src/route.rs | 79 +++++++++++++++++++------------------------ src/ws.rs | 59 ++++++++++++++++---------------- 11 files changed, 131 insertions(+), 211 deletions(-) diff --git a/README.md b/README.md index fdc13266..b24a310a 100644 --- a/README.md +++ b/README.md @@ -80,35 +80,10 @@ impl Actor for MyWebSocket { type Context = HttpContext; } -/// Http route handler -impl Route for MyWebSocket { - type State = (); - - fn request(mut req: HttpRequest, mut ctx: HttpContext) -> Result - { - // websocket handshake - let resp = ws::handshake(&req)?; - // send HttpResponse back to peer - ctx.start(resp); - // convert bytes stream to a stream of `ws::Message` and handle stream - ctx.add_stream(ws::WsStream::new(&mut req)); - ctx.reply(MyWebSocket) - } -} - /// Standard actix's stream handler for a stream of `ws::Message` -impl StreamHandler for MyWebSocket { - fn started(&mut self, ctx: &mut Self::Context) { - println!("WebSocket session openned"); - } - - fn finished(&mut self, ctx: &mut Self::Context) { - println!("WebSocket session closed"); - } -} - +impl StreamHandler for MyWebSocket {} impl Handler for MyWebSocket { - fn handle(&mut self, msg: ws::Message, ctx: &mut HttpContext) + fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) -> Response { // process websocket messages @@ -136,7 +111,7 @@ fn main() { // enable logger .middleware(middlewares::Logger::default()) // websocket route - .resource("/ws/", |r| r.get::()) + .resource("/ws/", |r| r.get(|req| ws::start(req, MyWebSocket))) .route_handler("/", StaticFiles::new("examples/static/", true))) .serve::<_, ()>("127.0.0.1:8080").unwrap(); diff --git a/examples/state.rs b/examples/state.rs index 2b7f8a2e..52a2da79 100644 --- a/examples/state.rs +++ b/examples/state.rs @@ -19,6 +19,7 @@ struct AppState { fn index(req: HttpRequest) -> HttpResponse { println!("{:?}", req); req.state().counter.set(req.state().counter.get() + 1); + httpcodes::HTTPOk.with_body( format!("Num of requests: {}", req.state().counter.get())) } @@ -30,25 +31,12 @@ struct MyWebSocket { } impl Actor for MyWebSocket { - type Context = HttpContext; -} - -impl Route for MyWebSocket { - /// Shared application state - type State = AppState; - - fn request(mut req: HttpRequest, mut ctx: HttpContext) -> Result - { - let resp = ws::handshake(&req)?; - ctx.start(resp); - ctx.add_stream(ws::WsStream::new(&mut req)); - ctx.reply(MyWebSocket{counter: 0}) - } + type Context = HttpContext; } impl StreamHandler for MyWebSocket {} impl Handler for MyWebSocket { - fn handle(&mut self, msg: ws::Message, ctx: &mut HttpContext) + fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) -> Response { self.counter += 1; @@ -76,7 +64,7 @@ fn main() { // enable logger .middleware(middlewares::Logger::default()) // websocket route - .resource("/ws/", |r| r.get::()) + .resource("/ws/", |r| r.get(|r| ws::start(r, MyWebSocket{counter: 0}))) // register simple handler, handle all methods .handler("/", index)) .serve::<_, ()>("127.0.0.1:8080").unwrap(); diff --git a/examples/websocket.rs b/examples/websocket.rs index 15016601..f8f4e567 100644 --- a/examples/websocket.rs +++ b/examples/websocket.rs @@ -12,28 +12,19 @@ use actix::*; use actix_web::*; +/// do websocket handshake and start `MyWebSocket` actor +fn ws_index(r: HttpRequest) -> Reply { + ws::start(r, MyWebSocket).into() +} + +/// websocket connection is long running connection, it easier +/// to handle with an actor struct MyWebSocket; impl Actor for MyWebSocket { type Context = HttpContext; } -/// Http route handler -impl Route for MyWebSocket { - type State = (); - - fn request(mut req: HttpRequest, mut ctx: HttpContext) -> Result - { - // websocket handshake - let resp = ws::handshake(&req)?; - // send HttpResponse back to peer - ctx.start(resp); - // convert bytes stream to a stream of `ws::Message` and register it - ctx.add_stream(ws::WsStream::new(&mut req)); - ctx.reply(MyWebSocket) - } -} - /// Standard actix's stream handler for a stream of `ws::Message` impl StreamHandler for MyWebSocket { fn started(&mut self, ctx: &mut Self::Context) { @@ -74,7 +65,7 @@ fn main() { // enable logger .middleware(middlewares::Logger::default()) // websocket route - .resource("/ws/", |r| r.get::()) + .resource("/ws/", |r| r.get(ws_index)) .route_handler("/", StaticFiles::new("examples/static/", true))) // start http server on 127.0.0.1:8080 .serve::<_, ()>("127.0.0.1:8080").unwrap(); diff --git a/src/application.rs b/src/application.rs index cbe92525..08160edb 100644 --- a/src/application.rs +++ b/src/application.rs @@ -132,23 +132,10 @@ impl ApplicationBuilder where S: 'static { /// use actix::*; /// use actix_web::*; /// - /// struct MyRoute; - /// - /// impl Actor for MyRoute { - /// type Context = HttpContext; - /// } - /// - /// impl Route for MyRoute { - /// type State = (); - /// - /// fn request(req: HttpRequest, ctx: HttpContext) -> Result { - /// Reply::reply(httpcodes::HTTPOk) - /// } - /// } /// fn main() { /// let app = Application::default("/") /// .resource("/test", |r| { - /// r.get::(); + /// r.get(|req| httpcodes::HTTPOk); /// r.handler(Method::HEAD, |req| httpcodes::HTTPMethodNotAllowed); /// }) /// .finish(); diff --git a/src/context.rs b/src/context.rs index a0bf97fb..8623c713 100644 --- a/src/context.rs +++ b/src/context.rs @@ -14,26 +14,27 @@ use actix::dev::{AsyncContextApi, ActorAddressCell, ActorItemsCell, ActorWaitCel use task::{IoContext, DrainFut}; use body::Binary; -use error::{Error, Result as ActixResult}; -use route::{Route, Frame, Reply}; +use error::Error; +use route::Frame; +use httprequest::HttpRequest; use httpresponse::HttpResponse; /// Http actor execution context -pub struct HttpContext where A: Actor> + Route, +pub struct HttpContext where A: Actor>, { - act: Option, + act: A, state: ActorState, modified: bool, items: ActorItemsCell, address: ActorAddressCell, stream: VecDeque, wait: ActorWaitCell, - app_state: Rc<::State>, + request: HttpRequest, disconnected: bool, } -impl IoContext for HttpContext where A: Actor + Route { +impl IoContext for HttpContext where A: Actor, S: 'static { fn disconnected(&mut self) { self.items.stop(); @@ -44,7 +45,7 @@ impl IoContext for HttpContext where A: Actor + Route { } } -impl ActorContext for HttpContext where A: Actor + Route +impl ActorContext for HttpContext where A: Actor { /// Stop actor execution fn stop(&mut self) { @@ -69,7 +70,7 @@ impl ActorContext for HttpContext where A: Actor + Route } } -impl AsyncContext for HttpContext where A: Actor + Route +impl AsyncContext for HttpContext where A: Actor { fn spawn(&mut self, fut: F) -> SpawnHandle where F: ActorFuture + 'static @@ -96,41 +97,43 @@ impl AsyncContext for HttpContext where A: Actor + Route } #[doc(hidden)] -impl AsyncContextApi for HttpContext where A: Actor + Route { +impl AsyncContextApi for HttpContext where A: Actor { fn address_cell(&mut self) -> &mut ActorAddressCell { &mut self.address } } -impl HttpContext where A: Actor + Route { +impl HttpContext where A: Actor { - pub fn new(state: Rc<::State>) -> HttpContext + pub fn new(req: HttpRequest, actor: A) -> HttpContext { HttpContext { - act: None, + act: actor, state: ActorState::Started, modified: false, items: ActorItemsCell::default(), address: ActorAddressCell::default(), wait: ActorWaitCell::default(), stream: VecDeque::new(), - app_state: state, + request: req, disconnected: false, } } - - pub(crate) fn set_actor(&mut self, act: A) { - self.act = Some(act) - } } -impl HttpContext where A: Actor + Route { +impl HttpContext where A: Actor { /// Shared application state - pub fn state(&self) -> &::State { - &self.app_state + pub fn state(&self) -> &S { + self.request.state() } + /// Incoming request + pub fn request(&mut self) -> &mut HttpRequest { + &mut self.request + } + + /// Start response processing pub fn start>(&mut self, response: R) { self.stream.push_back(Frame::Message(response.into())) @@ -158,14 +161,9 @@ impl HttpContext where A: Actor + Route { pub fn connected(&self) -> bool { !self.disconnected } - - pub fn reply(mut self, actor: A) -> ActixResult { - self.set_actor(actor); - Reply::async(self) - } } -impl HttpContext where A: Actor + Route { +impl HttpContext where A: Actor { #[doc(hidden)] pub fn subscriber(&mut self) -> Box> @@ -187,20 +185,17 @@ impl HttpContext where A: Actor + Route { } #[doc(hidden)] -impl Stream for HttpContext where A: Actor + Route +impl Stream for HttpContext where A: Actor { type Item = Frame; type Error = Error; fn poll(&mut self) -> Poll, Error> { - if self.act.is_none() { - return Ok(Async::NotReady) - } let act: &mut A = unsafe { - std::mem::transmute(self.act.as_mut().unwrap() as &mut A) + std::mem::transmute(&mut self.act as &mut A) }; - let ctx: &mut HttpContext = unsafe { - std::mem::transmute(self as &mut HttpContext) + let ctx: &mut HttpContext = unsafe { + std::mem::transmute(self as &mut HttpContext) }; // update state @@ -283,8 +278,8 @@ impl Stream for HttpContext where A: Actor + Route } } -impl ToEnvelope for HttpContext - where A: Actor> + Route, +impl ToEnvelope for HttpContext + where A: Actor>, { fn pack(msg: M, tx: Option>>) -> Envelope where A: Handler, diff --git a/src/dev.rs b/src/dev.rs index 53cb546e..8ec4a50d 100644 --- a/src/dev.rs +++ b/src/dev.rs @@ -11,7 +11,6 @@ // dev specific pub use task::Task; pub use pipeline::Pipeline; -pub use route::RouteFactory; pub use recognizer::RouteRecognizer; pub use channel::HttpChannel; diff --git a/src/httprequest.rs b/src/httprequest.rs index 3c0f6edc..f2d9904c 100644 --- a/src/httprequest.rs +++ b/src/httprequest.rs @@ -95,11 +95,6 @@ impl HttpRequest { &self.1 } - /// Clone application state - pub(crate) fn clone_state(&self) -> Rc { - Rc::clone(&self.1) - } - /// Protocol extensions. #[inline] pub fn extensions(&mut self) -> &mut Extensions { diff --git a/src/lib.rs b/src/lib.rs index 84fe563a..7efa9aa8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -82,7 +82,7 @@ pub use application::Application; pub use httprequest::{HttpRequest, UrlEncoded}; pub use httpresponse::HttpResponse; pub use payload::{Payload, PayloadItem}; -pub use route::{Frame, Route, RouteFactory, RouteHandler, Reply}; +pub use route::{Frame, RouteHandler, Reply}; pub use resource::Resource; pub use recognizer::Params; pub use server::HttpServer; diff --git a/src/resource.rs b/src/resource.rs index a7cdece1..93d31cb3 100644 --- a/src/resource.rs +++ b/src/resource.rs @@ -1,14 +1,12 @@ use std::marker::PhantomData; use std::collections::HashMap; -use actix::Actor; use http::Method; use futures::Stream; use task::Task; use error::Error; -use route::{Reply, Route, RouteHandler, Frame, FnHandler, StreamHandler}; -use context::HttpContext; +use route::{Reply, RouteHandler, Frame, FnHandler, StreamHandler}; use httprequest::HttpRequest; use httpcodes::{HTTPNotFound, HTTPMethodNotAllowed}; @@ -92,31 +90,31 @@ impl Resource where S: 'static { } /// Handler for `GET` method. - pub fn get(&mut self) - where A: Actor> + Route - { - self.route_handler(Method::GET, A::factory()); + pub fn get(&mut self, handler: F) + where F: Fn(HttpRequest) -> R + 'static, + R: Into + 'static, { + self.routes.insert(Method::GET, Box::new(FnHandler::new(handler))); } /// Handler for `POST` method. - pub fn post(&mut self) - where A: Actor> + Route - { - self.route_handler(Method::POST, A::factory()); + pub fn post(&mut self, handler: F) + where F: Fn(HttpRequest) -> R + 'static, + R: Into + 'static, { + self.routes.insert(Method::POST, Box::new(FnHandler::new(handler))); } - /// Handler for `PUR` method. - pub fn put(&mut self) - where A: Actor> + Route - { - self.route_handler(Method::PUT, A::factory()); + /// Handler for `PUT` method. + pub fn put(&mut self, handler: F) + where F: Fn(HttpRequest) -> R + 'static, + R: Into + 'static, { + self.routes.insert(Method::PUT, Box::new(FnHandler::new(handler))); } - /// Handler for `METHOD` method. - pub fn delete(&mut self) - where A: Actor> + Route - { - self.route_handler(Method::DELETE, A::factory()); + /// Handler for `DELETE` method. + pub fn delete(&mut self, handler: F) + where F: Fn(HttpRequest) -> R + 'static, + R: Into + 'static, { + self.routes.insert(Method::DELETE, Box::new(FnHandler::new(handler))); } } diff --git a/src/route.rs b/src/route.rs index 98701375..3842c769 100644 --- a/src/route.rs +++ b/src/route.rs @@ -1,14 +1,15 @@ use std::rc::Rc; use std::cell::RefCell; use std::marker::PhantomData; +use std::result::Result as StdResult; use actix::Actor; -use http::{header, Version}; +// use http::{header, Version}; use futures::Stream; use task::{Task, DrainFut, IoContext}; use body::Binary; -use error::{Error, ExpectError, Result}; +use error::{Error}; //, ExpectError, Result}; use context::HttpContext; use httprequest::HttpRequest; use httpresponse::HttpResponse; @@ -37,9 +38,10 @@ pub trait RouteHandler: 'static { fn set_prefix(&mut self, prefix: String) {} } +/* /// Actors with ability to handle http requests. #[allow(unused_variables)] -pub trait Route: Actor { +pub trait RouteState { /// Shared state. State is shared with all routes within same application /// and could be accessed with `HttpRequest::state()` method. type State; @@ -69,42 +71,13 @@ pub trait Route: Actor { } } - /// Handle incoming request. Route actor can return - /// result immediately with `Reply::reply`. - /// Actor itself can be returned with `Reply::stream` for handling streaming - /// request/response or websocket connection. - /// In that case `HttpContext::start` and `HttpContext::write` has to be used - /// for writing response. - fn request(req: HttpRequest, ctx: Self::Context) -> Result; - - /// This method creates `RouteFactory` for this actor. - fn factory() -> RouteFactory { - RouteFactory(PhantomData) + /// Handle incoming request with http actor. + fn handle(req: HttpRequest) -> Result + where Self: Default, Self: Actor> + { + Ok(HttpContext::new(req, Self::default()).into()) } -} - -/// This is used for routes registration within `Resource` -pub struct RouteFactory, S>(PhantomData); - -impl RouteHandler for RouteFactory - where A: Actor> + Route, - S: 'static -{ - fn handle(&self, mut req: HttpRequest, task: &mut Task) { - let mut ctx = HttpContext::new(req.clone_state()); - - // handle EXPECT header - if req.headers().contains_key(header::EXPECT) { - if let Err(resp) = A::expect(&mut req, &mut ctx) { - task.reply(resp) - } - } - match A::request(req, ctx) { - Ok(reply) => reply.into(task), - Err(err) => task.reply(err), - } - } -} +}*/ /// Fn() route handler pub(crate) @@ -180,20 +153,22 @@ pub struct Reply(ReplyItem); impl Reply { /// Create actor response - pub(crate) fn async(ctx: C) -> Result { - Ok(Reply(ReplyItem::Actor(Box::new(ctx)))) + pub fn actor(ctx: HttpContext) -> Reply + where A: Actor>, S: 'static + { + Reply(ReplyItem::Actor(Box::new(ctx))) } /// Create async response - pub fn stream(stream: S) -> Result + pub fn stream(stream: S) -> Reply where S: Stream + 'static { - Ok(Reply(ReplyItem::Stream(Box::new(stream)))) + Reply(ReplyItem::Stream(Box::new(stream))) } /// Send response - pub fn reply>(response: R) -> Result { - Ok(Reply(ReplyItem::Message(response.into()))) + pub fn reply>(response: R) -> Reply { + Reply(ReplyItem::Message(response.into())) } pub fn into(self, task: &mut Task) @@ -218,3 +193,19 @@ impl> From for Reply Reply(ReplyItem::Message(item.into())) } } + +impl> From> for Reply { + fn from(res: StdResult) -> Self { + match res { + Ok(val) => val, + Err(err) => err.into().into(), + } + } +} + +impl>, S: 'static> From> for Reply +{ + fn from(item: HttpContext) -> Self { + Reply(ReplyItem::Actor(Box::new(item))) + } +} diff --git a/src/ws.rs b/src/ws.rs index 201197e4..faea7d08 100644 --- a/src/ws.rs +++ b/src/ws.rs @@ -12,6 +12,10 @@ //! use actix::*; //! use actix_web::*; //! +//! fn ws_index(req: HttpRequest) -> Result { +//! ws::start(req, WsRoute) +//! } +//! //! // WebSocket Route //! struct WsRoute; //! @@ -19,22 +23,6 @@ //! type Context = HttpContext; //! } //! -//! impl Route for WsRoute { -//! type State = (); -//! -//! fn request(mut req: HttpRequest, mut ctx: HttpContext) -> Result -//! { -//! // WebSocket handshake -//! let resp = ws::handshake(&req)?; -//! // Send handshake response to peer -//! ctx.start(resp); -//! // Map Payload into WsStream -//! ctx.add_stream(ws::WsStream::new(&mut req)); -//! // Start ws messages processing -//! ctx.reply(WsRoute) -//! } -//! } -//! //! // Define Handler for ws::Message message //! impl StreamHandler for WsRoute {} //! @@ -59,13 +47,13 @@ use http::{Method, StatusCode, header}; use bytes::BytesMut; use futures::{Async, Poll, Stream}; -use actix::{Actor, ResponseType}; +use actix::{Actor, AsyncContext, ResponseType, StreamHandler}; use body::Body; use context::HttpContext; -use route::Route; +use route::Reply; use payload::Payload; -use error::WsHandshakeError; +use error::{Error, WsHandshakeError}; use httprequest::HttpRequest; use httpresponse::{ConnectionType, HttpResponse}; @@ -100,6 +88,19 @@ impl ResponseType for Message { type Error = (); } +pub fn start(mut req: HttpRequest, actor: A) -> Result + where A: Actor> + StreamHandler, + S: 'static +{ + let resp = handshake(&req)?; + + let stream = WsStream::new(&mut req); + let mut ctx = HttpContext::new(req, actor); + ctx.start(resp); + ctx.add_stream(stream); + Ok(ctx.into()) +} + /// Prepare `WebSocket` handshake response. /// /// This function returns handshake `HttpResponse`, ready to send to peer. @@ -271,8 +272,8 @@ pub struct WsWriter; impl WsWriter { /// Send text frame - pub fn text(ctx: &mut HttpContext, text: &str) - where A: Actor> + Route + pub fn text(ctx: &mut HttpContext, text: &str) + where A: Actor> { let mut frame = wsframe::Frame::message(Vec::from(text), OpCode::Text, true); let mut buf = Vec::new(); @@ -282,8 +283,8 @@ impl WsWriter { } /// Send binary frame - pub fn binary(ctx: &mut HttpContext, data: Vec) - where A: Actor> + Route + pub fn binary(ctx: &mut HttpContext, data: Vec) + where A: Actor> { let mut frame = wsframe::Frame::message(data, OpCode::Binary, true); let mut buf = Vec::new(); @@ -293,8 +294,8 @@ impl WsWriter { } /// Send ping frame - pub fn ping(ctx: &mut HttpContext, message: &str) - where A: Actor> + Route + pub fn ping(ctx: &mut HttpContext, message: &str) + where A: Actor> { let mut frame = wsframe::Frame::message(Vec::from(message), OpCode::Ping, true); let mut buf = Vec::new(); @@ -304,8 +305,8 @@ impl WsWriter { } /// Send pong frame - pub fn pong(ctx: &mut HttpContext, message: &str) - where A: Actor> + Route + pub fn pong(ctx: &mut HttpContext, message: &str) + where A: Actor> { let mut frame = wsframe::Frame::message(Vec::from(message), OpCode::Pong, true); let mut buf = Vec::new(); @@ -315,8 +316,8 @@ impl WsWriter { } /// Send close frame - pub fn close(ctx: &mut HttpContext, code: CloseCode, reason: &str) - where A: Actor> + Route + pub fn close(ctx: &mut HttpContext, code: CloseCode, reason: &str) + where A: Actor> { let mut frame = wsframe::Frame::close(code, reason); let mut buf = Vec::new();