diff --git a/src/app.rs b/src/app.rs index fc336e857..fcb491a21 100644 --- a/src/app.rs +++ b/src/app.rs @@ -458,11 +458,12 @@ where Error = Error, InitError = (), >, + T::Future: 'static, { fn into_factory(self) -> AppInit { AppInit { - data: self.data.into_boxed_slice().into(), - data_factories: self.data_factories.into_boxed_slice().into(), + data_factories: self.data.into_boxed_slice().into(), + async_data_factories: self.data_factories.into_boxed_slice().into(), endpoint: self.endpoint, services: Rc::new(RefCell::new(self.services)), external: RefCell::new(self.external), diff --git a/src/app_service.rs b/src/app_service.rs index 686be6312..96040b8fb 100644 --- a/src/app_service.rs +++ b/src/app_service.rs @@ -1,15 +1,15 @@ use std::cell::RefCell; use std::future::Future; -use std::marker::PhantomData; use std::pin::Pin; use std::rc::Rc; use std::task::{Context, Poll}; use actix_http::{Extensions, Request, Response}; -use actix_router::{Path, ResourceDef, ResourceInfo, Router, Url}; +use actix_router::{Path, ResourceDef, Router, Url}; use actix_service::boxed::{self, BoxService, BoxServiceFactory}; use actix_service::{fn_service, Service, ServiceFactory}; -use futures_util::future::{join_all, ok, FutureExt, LocalBoxFuture}; +use futures_core::future::LocalBoxFuture; +use futures_util::future::join_all; use crate::config::{AppConfig, AppService}; use crate::data::{DataFactory, FnDataFactory}; @@ -22,7 +22,6 @@ use crate::service::{AppServiceFactory, ServiceRequest, ServiceResponse}; type Guards = Vec>; type HttpService = BoxService; type HttpNewService = BoxServiceFactory<(), ServiceRequest, ServiceResponse, Error, ()>; -type BoxResponse = LocalBoxFuture<'static, Result>; /// Service factory to convert `Request` to a `ServiceRequest`. /// It also executes data factories. @@ -38,8 +37,8 @@ where { pub(crate) endpoint: T, pub(crate) extensions: RefCell>, - pub(crate) data: Rc<[Box]>, - pub(crate) data_factories: Rc<[FnDataFactory]>, + pub(crate) data_factories: Rc<[Box]>, + pub(crate) async_data_factories: Rc<[FnDataFactory]>, pub(crate) services: Rc>>>, pub(crate) default: Option>, pub(crate) factory_ref: Rc>>, @@ -55,24 +54,26 @@ where Error = Error, InitError = (), >, + T::Future: 'static, { type Response = ServiceResponse; type Error = T::Error; type Config = AppConfig; type Service = AppInitService; type InitError = T::InitError; - type Future = AppInitResult; + type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, config: AppConfig) -> Self::Future { // update resource default service let default = self.default.clone().unwrap_or_else(|| { - Rc::new(boxed::factory(fn_service(|req: ServiceRequest| { - ok(req.into_response(Response::NotFound().finish())) + Rc::new(boxed::factory(fn_service(|req: ServiceRequest| async { + Ok(req.into_response(Response::NotFound().finish())) }))) }); // App config - let mut config = AppService::new(config, default.clone(), self.data.clone()); + let mut config = + AppService::new(config, default.clone(), self.data_factories.clone()); // register services std::mem::take(&mut *self.services.borrow_mut()) @@ -83,7 +84,7 @@ where let (config, services) = config.into_services(); - // complete pipeline creation + // complete pipeline creation. *self.factory_ref.borrow_mut() = Some(AppRoutingFactory { default, services: services @@ -106,107 +107,48 @@ where let rmap = Rc::new(rmap); rmap.finish(rmap.clone()); - // start all data factory futures - let factory_futs = join_all(self.data_factories.iter().map(|f| f())); + // construct all async data factory futures + let factory_futs = join_all(self.async_data_factories.iter().map(|f| f())); - AppInitResult { - endpoint: None, - endpoint_fut: self.endpoint.new_service(()), - data: self.data.clone(), - data_factories: None, - data_factories_fut: factory_futs.boxed_local(), - extensions: Some( - self.extensions - .borrow_mut() - .take() - .unwrap_or_else(Extensions::new), - ), - config, - rmap, - _phantom: PhantomData, - } - } -} + // construct app service and middleware service factory future. + let endpoint_fut = self.endpoint.new_service(()); -#[pin_project::pin_project] -pub struct AppInitResult -where - T: ServiceFactory, -{ - #[pin] - endpoint_fut: T::Future, - // a Some signals completion of endpoint creation - endpoint: Option, + // take extensions or create new one as app data container. + let mut app_data = self + .extensions + .borrow_mut() + .take() + .unwrap_or_else(Extensions::new); - #[pin] - data_factories_fut: LocalBoxFuture<'static, Vec, ()>>>, - // a Some signals completion of factory futures - data_factories: Option>>, + let data_factories = self.data_factories.clone(); - rmap: Rc, - config: AppConfig, - data: Rc<[Box]>, - extensions: Option, + Box::pin(async move { + // async data factories + let async_data_factories = factory_futs + .await + .into_iter() + .collect::, _>>() + .map_err(|_| ())?; - _phantom: PhantomData, -} + // app service and middleware + let service = endpoint_fut.await?; -impl Future for AppInitResult -where - T: ServiceFactory< - ServiceRequest, - Config = (), - Response = ServiceResponse, - Error = Error, - InitError = (), - >, -{ - type Output = Result, ()>; + // populate app data container from (async) data factories. + data_factories + .iter() + .chain(&async_data_factories) + .for_each(|factory| { + factory.create(&mut app_data); + }); - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - - // async data factories - if let Poll::Ready(factories) = this.data_factories_fut.poll(cx) { - let factories: Result, ()> = factories.into_iter().collect(); - - if let Ok(factories) = factories { - this.data_factories.replace(factories); - } else { - return Poll::Ready(Err(())); - } - } - - // app service and middleware - if this.endpoint.is_none() { - if let Poll::Ready(srv) = this.endpoint_fut.poll(cx)? { - *this.endpoint = Some(srv); - } - } - - // not using if let so condition only needs shared ref - if this.endpoint.is_some() && this.data_factories.is_some() { - // create app data container - let mut data = this.extensions.take().unwrap(); - - for f in this.data.iter() { - f.create(&mut data); - } - - for f in this.data_factories.take().unwrap().iter() { - f.create(&mut data); - } - - return Poll::Ready(Ok(AppInitService { - service: this.endpoint.take().unwrap(), - rmap: this.rmap.clone(), - config: this.config.clone(), - data: Rc::new(data), + Ok(AppInitService { + service, + rmap, + config, + app_data: Rc::new(app_data), pool: HttpRequestPool::create(), - })); - } - - Poll::Pending + }) + }) } } @@ -218,7 +160,7 @@ where service: T, rmap: Rc, config: AppConfig, - data: Rc, + app_data: Rc, pool: &'static HttpRequestPool, } @@ -251,7 +193,7 @@ where payload, self.rmap.clone(), self.config.clone(), - self.data.clone(), + self.app_data.clone(), self.pool, ) }; @@ -290,7 +232,7 @@ impl ServiceFactory for AppRoutingFactory { CreateAppRoutingItem::Future( Some(path.clone()), guards.borrow_mut().take(), - service.new_service(()).boxed_local(), + Box::pin(service.new_service(())), ) }) .collect(), @@ -307,7 +249,7 @@ type HttpServiceFut = LocalBoxFuture<'static, Result>; pub struct AppRoutingFactoryResponse { fut: Vec, default: Option, - default_fut: Option>>, + default_fut: Option, } enum CreateAppRoutingItem { @@ -367,7 +309,6 @@ impl Future for AppRoutingFactoryResponse { router }); Poll::Ready(Ok(AppRouting { - ready: None, router: router.finish(), default: self.default.take(), })) @@ -379,22 +320,15 @@ impl Future for AppRoutingFactoryResponse { pub struct AppRouting { router: Router, - ready: Option<(ServiceRequest, ResourceInfo)>, default: Option, } impl Service for AppRouting { type Response = ServiceResponse; type Error = Error; - type Future = BoxResponse; + type Future = LocalBoxFuture<'static, Result>; - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - if self.ready.is_none() { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } - } + actix_service::always_ready!(); fn call(&mut self, mut req: ServiceRequest) -> Self::Future { let res = self.router.recognize_mut_checked(&mut req, |req, guards| { @@ -414,7 +348,9 @@ impl Service for AppRouting { default.call(req) } else { let req = req.into_parts().0; - ok(ServiceResponse::new(req, Response::NotFound().finish())).boxed_local() + Box::pin(async { + Ok(ServiceResponse::new(req, Response::NotFound().finish())) + }) } } } @@ -431,11 +367,11 @@ impl AppEntry { } impl ServiceFactory for AppEntry { - type Config = (); type Response = ServiceResponse; type Error = Error; - type InitError = (); + type Config = (); type Service = AppRouting; + type InitError = (); type Future = AppRoutingFactoryResponse; fn new_service(&self, _: ()) -> Self::Future {