From cc20fee62884d70990fbf0d0b4013172d8e2759d Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 2 Mar 2019 11:53:05 -0800 Subject: [PATCH] add request chain services --- src/app.rs | 480 ++++++++++++++++++++++++++----------- src/blocking.rs | 12 +- src/extractor.rs | 1 + src/lib.rs | 4 +- src/middleware/compress.rs | 3 - src/route.rs | 10 +- 6 files changed, 360 insertions(+), 150 deletions(-) diff --git a/src/app.rs b/src/app.rs index 7c077706..78f718db 100644 --- a/src/app.rs +++ b/src/app.rs @@ -2,7 +2,7 @@ use std::cell::RefCell; use std::marker::PhantomData; use std::rc::Rc; -use actix_http::body::{Body, MessageBody}; +use actix_http::body::MessageBody; use actix_http::{Extensions, PayloadStream, Request, Response}; use actix_router::{Path, ResourceDef, ResourceInfo, Router, Url}; use actix_service::boxed::{self, BoxedNewService, BoxedService}; @@ -30,32 +30,45 @@ pub trait HttpServiceFactory { } /// Application builder -pub struct App { - services: Vec<(ResourceDef, HttpNewService

)>, - default: Option>>, - defaults: Vec>>>>>, - endpoint: T, - factory_ref: Rc>>>, +pub struct App +where + T: NewService, Response = ServiceRequest

>, +{ + chain: T, extensions: Extensions, state: Vec>, - _t: PhantomData<(P, B)>, + _t: PhantomData<(P,)>, } -impl App> { - /// Create application with empty state. Application can +impl App { + /// Create application builder with empty state. Application can /// be configured with a builder-like pattern. pub fn new() -> Self { - App::create() + App { + chain: AppChain, + extensions: Extensions::new(), + state: Vec::new(), + _t: PhantomData, + } } } -impl Default for App> { +impl Default for App { fn default() -> Self { App::new() } } -impl App> { +impl App +where + P: 'static, + T: NewService< + Request = ServiceRequest, + Response = ServiceRequest

, + Error = (), + InitError = (), + >, +{ /// Create application with specified state. Application can be /// configured with a builder-like pattern. /// @@ -86,38 +99,172 @@ impl App> { self } - fn create() -> Self { + /// Configure resource for a specific path. + /// + /// Resources may have variable path segments. For example, a + /// resource with the path `/a/{name}/c` would match all incoming + /// requests with paths such as `/a/b/c`, `/a/1/c`, or `/a/etc/c`. + /// + /// A variable segment is specified in the form `{identifier}`, + /// where the identifier can be used later in a request handler to + /// access the matched value for that segment. This is done by + /// looking up the identifier in the `Params` object returned by + /// `HttpRequest.match_info()` method. + /// + /// By default, each segment matches the regular expression `[^{}/]+`. + /// + /// You can also specify a custom regex in the form `{identifier:regex}`: + /// + /// For instance, to route `GET`-requests on any route matching + /// `/users/{userid}/{friend}` and store `userid` and `friend` in + /// the exposed `Params` object: + /// + /// ```rust,ignore + /// # extern crate actix_web; + /// use actix_web::{http, App, HttpResponse}; + /// + /// fn main() { + /// let app = App::new().resource("/users/{userid}/{friend}", |r| { + /// r.get(|r| r.to(|_| HttpResponse::Ok())); + /// r.head(|r| r.to(|_| HttpResponse::MethodNotAllowed())) + /// }); + /// } + /// ``` + pub fn resource(self, path: &str, f: F) -> AppRouter> + where + F: FnOnce(Resource

) -> Resource, + U: NewService< + Request = ServiceRequest

, + Response = ServiceResponse, + Error = (), + InitError = (), + > + 'static, + { + let rdef = ResourceDef::new(path); + let resource = f(Resource::new()); + let default = resource.get_default(); + let fref = Rc::new(RefCell::new(None)); + AppRouter { + chain: self.chain, + services: vec![(rdef, boxed::new_service(resource.into_new_service()))], + default: None, + defaults: vec![default], + endpoint: AppEntry::new(fref.clone()), + factory_ref: fref, + extensions: self.extensions, + state: self.state, + _t: PhantomData, + } + } + + /// Register a middleware. + pub fn middleware( + self, + mw: F, + ) -> AppRouter< + T, + P, + B, + impl NewService< + Request = ServiceRequest

, + Response = ServiceResponse, + Error = (), + InitError = (), + >, + > + where + M: NewTransform< + AppService

, + Request = ServiceRequest

, + Response = ServiceResponse, + Error = (), + InitError = (), + >, + B: MessageBody, + F: IntoNewTransform>, + { + let fref = Rc::new(RefCell::new(None)); + let endpoint = ApplyNewService::new(mw, AppEntry::new(fref.clone())); + AppRouter { + endpoint, + chain: self.chain, + state: self.state, + services: Vec::new(), + default: None, + defaults: Vec::new(), + factory_ref: fref, + extensions: self.extensions, + _t: PhantomData, + } + } + + /// Register a request modifier. It can modify any request parameters + /// including payload stream. + pub fn chain( + self, + chain: C, + ) -> App< + P1, + impl NewService< + Request = ServiceRequest, + Response = ServiceRequest, + Error = (), + InitError = (), + >, + > + where + C: NewService< + (), + Request = ServiceRequest

, + Response = ServiceRequest, + Error = (), + InitError = (), + >, + F: IntoNewService, + { + let chain = self.chain.and_then(chain.into_new_service()); App { + chain, + state: self.state, + extensions: self.extensions, + _t: PhantomData, + } + } + + /// Complete applicatin chain configuration and start resource + /// configuration. + pub fn router(self) -> AppRouter> { + let fref = Rc::new(RefCell::new(None)); + AppRouter { + chain: self.chain, services: Vec::new(), default: None, defaults: Vec::new(), endpoint: AppEntry::new(fref.clone()), factory_ref: fref, - extensions: Extensions::new(), - state: Vec::new(), + extensions: self.extensions, + state: self.state, _t: PhantomData, } } } -// /// Application router builder -// pub struct AppRouter { -// services: Vec<( -// ResourceDef, -// BoxedHttpNewService, Response>, -// )>, -// default: Option, Response>>>, -// defaults: -// Vec, Response>>>>>>, -// state: AppState, -// endpoint: T, -// factory_ref: Rc>>>, -// extensions: Extensions, -// _t: PhantomData

, -// } +/// Structure that follows the builder pattern for building application +/// instances. +pub struct AppRouter { + chain: C, + services: Vec<(ResourceDef, HttpNewService

)>, + default: Option>>, + defaults: Vec>>>>>, + endpoint: T, + factory_ref: Rc>>>, + extensions: Extensions, + state: Vec>, + _t: PhantomData<(P, B)>, +} -impl App +impl AppRouter where P: 'static, B: MessageBody, @@ -221,7 +368,8 @@ where pub fn middleware( self, mw: F, - ) -> App< + ) -> AppRouter< + C, P, B1, impl NewService< @@ -243,14 +391,15 @@ where F: IntoNewTransform, { let endpoint = ApplyNewService::new(mw, self.endpoint); - App { + AppRouter { endpoint, + chain: self.chain, state: self.state, services: self.services, default: self.default, - defaults: Vec::new(), + defaults: self.defaults, factory_ref: self.factory_ref, - extensions: Extensions::new(), + extensions: self.extensions, _t: PhantomData, } } @@ -292,8 +441,8 @@ where } } -impl - IntoNewService, T, ()>> for App +impl + IntoNewService, T, ()>> for AppRouter where T: NewService< Request = ServiceRequest

, @@ -301,8 +450,14 @@ where Error = (), InitError = (), >, + C: NewService< + Request = ServiceRequest, + Response = ServiceRequest

, + Error = (), + InitError = (), + >, { - fn into_new_service(self) -> AndThenNewService, T, ()> { + fn into_new_service(self) -> AndThenNewService, T, ()> { // update resource default service if self.default.is_some() { for default in &self.defaults { @@ -317,99 +472,15 @@ where services: Rc::new(self.services), }); - AppStateFactory { + AppInit { + chain: self.chain, state: self.state, extensions: Rc::new(RefCell::new(Rc::new(self.extensions))), - _t: PhantomData, } .and_then(self.endpoint) } } -/// Service factory to convert `Request` to a `ServiceRequest` -pub struct AppStateFactory

{ - state: Vec>, - extensions: Rc>>, - _t: PhantomData

, -} - -impl NewService for AppStateFactory

{ - type Request = Request

; - type Response = ServiceRequest

; - type Error = (); - type InitError = (); - type Service = AppStateService

; - type Future = AppStateFactoryResult

; - - fn new_service(&self, _: &()) -> Self::Future { - AppStateFactoryResult { - state: self.state.iter().map(|s| s.construct()).collect(), - extensions: self.extensions.clone(), - _t: PhantomData, - } - } -} - -#[doc(hidden)] -pub struct AppStateFactoryResult

{ - state: Vec>, - extensions: Rc>>, - _t: PhantomData

, -} - -impl

Future for AppStateFactoryResult

{ - type Item = AppStateService

; - type Error = (); - - fn poll(&mut self) -> Poll { - if let Some(extensions) = Rc::get_mut(&mut *self.extensions.borrow_mut()) { - let mut idx = 0; - while idx < self.state.len() { - if let Async::Ready(_) = self.state[idx].poll_result(extensions)? { - self.state.remove(idx); - } else { - idx += 1; - } - } - if !self.state.is_empty() { - return Ok(Async::NotReady); - } - } else { - log::warn!("Multiple copies of app extensions exists"); - } - - Ok(Async::Ready(AppStateService { - extensions: self.extensions.borrow().clone(), - _t: PhantomData, - })) - } -} - -/// Service to convert `Request` to a `ServiceRequest` -pub struct AppStateService

{ - extensions: Rc, - _t: PhantomData

, -} - -impl

Service for AppStateService

{ - type Request = Request

; - type Response = ServiceRequest

; - type Error = (); - type Future = FutureResult; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) - } - - fn call(&mut self, req: Request

) -> Self::Future { - ok(ServiceRequest::new( - Path::new(Url::new(req.uri().clone())), - req, - self.extensions.clone(), - )) - } -} - pub struct AppFactory

{ services: Rc)>>, } @@ -530,17 +601,6 @@ impl

Service for AppService

{ } } -pub struct AppServiceResponse(Box>); - -impl Future for AppServiceResponse { - type Item = ServiceResponse; - type Error = (); - - fn poll(&mut self) -> Poll { - self.0.poll().map_err(|_| panic!()) - } -} - #[doc(hidden)] pub struct AppEntry

{ factory: Rc>>>, @@ -564,3 +624,151 @@ impl NewService for AppEntry

{ self.factory.borrow_mut().as_mut().unwrap().new_service(&()) } } + +#[doc(hidden)] +pub struct AppChain; + +impl NewService<()> for AppChain { + type Request = ServiceRequest; + type Response = ServiceRequest; + type Error = (); + type InitError = (); + type Service = AppChain; + type Future = FutureResult; + + fn new_service(&self, _: &()) -> Self::Future { + ok(AppChain) + } +} + +impl Service for AppChain { + type Request = ServiceRequest; + type Response = ServiceRequest; + type Error = (); + type Future = FutureResult; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + ok(req) + } +} + +/// Service factory to convert `Request` to a `ServiceRequest` +pub struct AppInit +where + C: NewService, Response = ServiceRequest

>, +{ + chain: C, + state: Vec>, + extensions: Rc>>, +} + +impl NewService for AppInit +where + C: NewService< + Request = ServiceRequest, + Response = ServiceRequest

, + InitError = (), + >, +{ + type Request = Request; + type Response = ServiceRequest

; + type Error = C::Error; + type InitError = C::InitError; + type Service = AppInitService; + type Future = AppInitResult; + + fn new_service(&self, _: &()) -> Self::Future { + AppInitResult { + chain: self.chain.new_service(&()), + state: self.state.iter().map(|s| s.construct()).collect(), + extensions: self.extensions.clone(), + } + } +} + +#[doc(hidden)] +pub struct AppInitResult +where + C: NewService< + Request = ServiceRequest, + Response = ServiceRequest

, + InitError = (), + >, +{ + chain: C::Future, + state: Vec>, + extensions: Rc>>, +} + +impl Future for AppInitResult +where + C: NewService< + Request = ServiceRequest, + Response = ServiceRequest

, + InitError = (), + >, +{ + type Item = AppInitService; + type Error = C::InitError; + + fn poll(&mut self) -> Poll { + if let Some(extensions) = Rc::get_mut(&mut *self.extensions.borrow_mut()) { + let mut idx = 0; + while idx < self.state.len() { + if let Async::Ready(_) = self.state[idx].poll_result(extensions)? { + self.state.remove(idx); + } else { + idx += 1; + } + } + if !self.state.is_empty() { + return Ok(Async::NotReady); + } + } else { + log::warn!("Multiple copies of app extensions exists"); + } + + let chain = futures::try_ready!(self.chain.poll()); + + Ok(Async::Ready(AppInitService { + chain, + extensions: self.extensions.borrow().clone(), + })) + } +} + +/// Service to convert `Request` to a `ServiceRequest` +pub struct AppInitService +where + C: Service, Response = ServiceRequest

>, +{ + chain: C, + extensions: Rc, +} + +impl Service for AppInitService +where + C: Service, Response = ServiceRequest

>, +{ + type Request = Request; + type Response = ServiceRequest

; + type Error = C::Error; + type Future = C::Future; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.chain.poll_ready() + } + + fn call(&mut self, req: Request) -> Self::Future { + let req = ServiceRequest::new( + Path::new(Url::new(req.uri().clone())), + req, + self.extensions.clone(), + ); + self.chain.call(req) + } +} diff --git a/src/blocking.rs b/src/blocking.rs index fc2624f6..abf4282c 100644 --- a/src/blocking.rs +++ b/src/blocking.rs @@ -24,7 +24,7 @@ lazy_static::lazy_static! { Mutex::new( threadpool::Builder::new() .thread_name("actix-web".to_owned()) - .num_threads(8) + .num_threads(default) .build(), ) }; @@ -45,11 +45,15 @@ pub enum BlockingError { /// to result of the function execution. pub fn run(f: F) -> CpuFuture where - F: FnOnce() -> Result, + F: FnOnce() -> Result + Send + 'static, + I: Send + 'static, + E: Send + 'static, { let (tx, rx) = oneshot::channel(); - POOL.with(move |pool| { - let _ = tx.send(f()); + POOL.with(|pool| { + pool.execute(move || { + let _ = tx.send(f()); + }) }); CpuFuture { rx } diff --git a/src/extractor.rs b/src/extractor.rs index 53209ad0..48c70b86 100644 --- a/src/extractor.rs +++ b/src/extractor.rs @@ -1,3 +1,4 @@ +#![allow(dead_code)] use std::ops::{Deref, DerefMut}; use std::rc::Rc; use std::{fmt, str}; diff --git a/src/lib.rs b/src/lib.rs index 74fa0a94..c70f47e8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -#![allow(clippy::type_complexity, dead_code, unused_variables)] +#![allow(clippy::type_complexity)] mod app; mod extractor; @@ -28,7 +28,7 @@ pub use crate::service::{ServiceRequest, ServiceResponse}; pub use crate::state::State; pub mod dev { - pub use crate::app::AppService; + pub use crate::app::AppRouter; pub use crate::handler::{AsyncFactory, Extract, Factory, Handle}; pub use crate::route::{Route, RouteBuilder}; // pub use crate::info::ConnectionInfo; diff --git a/src/middleware/compress.rs b/src/middleware/compress.rs index fee17ce1..5d5586cf 100644 --- a/src/middleware/compress.rs +++ b/src/middleware/compress.rs @@ -117,7 +117,6 @@ where enum EncoderBody { Body(B), Other(Box), - None, } pub struct Encoder { @@ -131,7 +130,6 @@ impl MessageBody for Encoder { match self.body { EncoderBody::Body(ref b) => b.length(), EncoderBody::Other(ref b) => b.length(), - EncoderBody::None => BodyLength::Empty, } } else { BodyLength::Stream @@ -143,7 +141,6 @@ impl MessageBody for Encoder { let result = match self.body { EncoderBody::Body(ref mut b) => b.poll_next()?, EncoderBody::Other(ref mut b) => b.poll_next()?, - EncoderBody::None => return Ok(Async::Ready(None)), }; match result { Async::NotReady => return Ok(Async::NotReady), diff --git a/src/route.rs b/src/route.rs index 574e8e34..4abb2f1d 100644 --- a/src/route.rs +++ b/src/route.rs @@ -320,11 +320,11 @@ impl RouteBuilder

{ } } -pub struct RouteServiceBuilder { - service: T, - filters: Vec>, - _t: PhantomData<(P, U1, U2)>, -} +// pub struct RouteServiceBuilder { +// service: T, +// filters: Vec>, +// _t: PhantomData<(P, U1, U2)>, +// } // impl RouteServiceBuilder // where