mirror of
https://github.com/fafhrd91/actix-web
synced 2024-11-24 08:22:59 +01:00
simplify data factory future polling (#1473)
Co-authored-by: Yuki Okushi <huyuumi.dev@gmail.com>
This commit is contained in:
parent
b047413b39
commit
bb17280f51
15
src/app.rs
15
src/app.rs
@ -476,13 +476,13 @@ where
|
||||
mod tests {
|
||||
use actix_service::Service;
|
||||
use bytes::Bytes;
|
||||
use futures::future::ok;
|
||||
use futures::future::{ok, err};
|
||||
|
||||
use super::*;
|
||||
use crate::http::{header, HeaderValue, Method, StatusCode};
|
||||
use crate::middleware::DefaultHeaders;
|
||||
use crate::service::ServiceRequest;
|
||||
use crate::test::{call_service, init_service, read_body, TestRequest};
|
||||
use crate::test::{call_service, init_service, try_init_service, read_body, TestRequest};
|
||||
use crate::{web, HttpRequest, HttpResponse};
|
||||
|
||||
#[actix_rt::test]
|
||||
@ -551,6 +551,17 @@ mod tests {
|
||||
assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_data_factory_errors() {
|
||||
let srv =
|
||||
try_init_service(App::new().data_factory(|| err::<u32, _>(())).service(
|
||||
web::resource("/").to(|_: web::Data<usize>| HttpResponse::Ok()),
|
||||
))
|
||||
.await;
|
||||
|
||||
assert!(srv.is_err());
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_extension() {
|
||||
let mut srv = init_service(App::new().app_data(10usize).service(
|
||||
|
@ -9,7 +9,7 @@ use actix_http::{Extensions, Request, Response};
|
||||
use actix_router::{Path, ResourceDef, ResourceInfo, Router, Url};
|
||||
use actix_service::boxed::{self, BoxService, BoxServiceFactory};
|
||||
use actix_service::{fn_service, Service, ServiceFactory};
|
||||
use futures::future::{ok, FutureExt, LocalBoxFuture};
|
||||
use futures::future::{join_all, ok, FutureExt, LocalBoxFuture};
|
||||
|
||||
use crate::config::{AppConfig, AppService};
|
||||
use crate::data::DataFactory;
|
||||
@ -109,12 +109,15 @@ where
|
||||
let rmap = Rc::new(rmap);
|
||||
rmap.finish(rmap.clone());
|
||||
|
||||
// start all data factory futures
|
||||
let factory_futs = join_all(self.data_factories.iter().map(|f| f()));
|
||||
|
||||
AppInitResult {
|
||||
endpoint: None,
|
||||
endpoint_fut: self.endpoint.new_service(()),
|
||||
data: self.data.clone(),
|
||||
data_factories: Vec::new(),
|
||||
data_factories_fut: self.data_factories.iter().map(|f| f()).collect(),
|
||||
data_factories: None,
|
||||
data_factories_fut: factory_futs.boxed_local(),
|
||||
extensions: Some(
|
||||
self.extensions
|
||||
.borrow_mut()
|
||||
@ -133,15 +136,21 @@ pub struct AppInitResult<T, B>
|
||||
where
|
||||
T: ServiceFactory,
|
||||
{
|
||||
endpoint: Option<T::Service>,
|
||||
#[pin]
|
||||
endpoint_fut: T::Future,
|
||||
// a Some signals completion of endpoint creation
|
||||
endpoint: Option<T::Service>,
|
||||
|
||||
#[pin]
|
||||
data_factories_fut: LocalBoxFuture<'static, Vec<Result<Box<dyn DataFactory>, ()>>>,
|
||||
// a Some signals completion of factory futures
|
||||
data_factories: Option<Vec<Box<dyn DataFactory>>>,
|
||||
|
||||
rmap: Rc<ResourceMap>,
|
||||
config: AppConfig,
|
||||
data: Rc<Vec<Box<dyn DataFactory>>>,
|
||||
data_factories: Vec<Box<dyn DataFactory>>,
|
||||
data_factories_fut: Vec<LocalBoxFuture<'static, Result<Box<dyn DataFactory>, ()>>>,
|
||||
extensions: Option<Extensions>,
|
||||
|
||||
_t: PhantomData<B>,
|
||||
}
|
||||
|
||||
@ -161,44 +170,46 @@ where
|
||||
let this = self.project();
|
||||
|
||||
// async data factories
|
||||
let mut idx = 0;
|
||||
while idx < this.data_factories_fut.len() {
|
||||
match Pin::new(&mut this.data_factories_fut[idx]).poll(cx)? {
|
||||
Poll::Ready(f) => {
|
||||
this.data_factories.push(f);
|
||||
let _ = this.data_factories_fut.remove(idx);
|
||||
}
|
||||
Poll::Pending => idx += 1,
|
||||
if let Poll::Ready(factories) = this.data_factories_fut.poll(cx) {
|
||||
let factories: Result<Vec<_>, ()> = factories.into_iter().collect();
|
||||
|
||||
if let Ok(factories) = factories {
|
||||
this.data_factories.replace(factories);
|
||||
} else {
|
||||
return Poll::Ready(Err(()));
|
||||
}
|
||||
}
|
||||
|
||||
// app service and middleware
|
||||
if this.endpoint.is_none() {
|
||||
if let Poll::Ready(srv) = this.endpoint_fut.poll(cx)? {
|
||||
*this.endpoint = Some(srv);
|
||||
}
|
||||
}
|
||||
|
||||
if this.endpoint.is_some() && this.data_factories_fut.is_empty() {
|
||||
// not using if let so condition only needs shared ref
|
||||
if this.endpoint.is_some() && this.data_factories.is_some() {
|
||||
// create app data container
|
||||
let mut data = this.extensions.take().unwrap();
|
||||
|
||||
for f in this.data.iter() {
|
||||
f.create(&mut data);
|
||||
}
|
||||
|
||||
for f in this.data_factories.iter() {
|
||||
for f in this.data_factories.take().unwrap().iter() {
|
||||
f.create(&mut data);
|
||||
}
|
||||
|
||||
Poll::Ready(Ok(AppInitService {
|
||||
return Poll::Ready(Ok(AppInitService {
|
||||
service: this.endpoint.take().unwrap(),
|
||||
rmap: this.rmap.clone(),
|
||||
config: this.config.clone(),
|
||||
data: Rc::new(data),
|
||||
pool: HttpRequestPool::create(),
|
||||
}))
|
||||
} else {
|
||||
Poll::Pending
|
||||
}));
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
|
44
src/test.rs
44
src/test.rs
@ -78,6 +78,26 @@ pub fn default_service(
|
||||
pub async fn init_service<R, S, B, E>(
|
||||
app: R,
|
||||
) -> impl Service<Request = Request, Response = ServiceResponse<B>, Error = E>
|
||||
where
|
||||
R: IntoServiceFactory<S>,
|
||||
S: ServiceFactory<
|
||||
Config = AppConfig,
|
||||
Request = Request,
|
||||
Response = ServiceResponse<B>,
|
||||
Error = E,
|
||||
>,
|
||||
S::InitError: std::fmt::Debug,
|
||||
{
|
||||
try_init_service(app).await.expect("service initilization failed")
|
||||
}
|
||||
|
||||
/// Fallible version of init_service that allows testing data factory errors.
|
||||
pub(crate) async fn try_init_service<R, S, B, E>(
|
||||
app: R,
|
||||
) -> Result<
|
||||
impl Service<Request = Request, Response = ServiceResponse<B>, Error = E>,
|
||||
S::InitError,
|
||||
>
|
||||
where
|
||||
R: IntoServiceFactory<S>,
|
||||
S: ServiceFactory<
|
||||
@ -89,7 +109,7 @@ where
|
||||
S::InitError: std::fmt::Debug,
|
||||
{
|
||||
let srv = app.into_factory();
|
||||
srv.new_service(AppConfig::default()).await.unwrap()
|
||||
srv.new_service(AppConfig::default()).await
|
||||
}
|
||||
|
||||
/// Calls service and waits for response future completion.
|
||||
@ -580,7 +600,7 @@ impl TestRequest {
|
||||
pub async fn send_request<S, B, E>(self, app: &mut S) -> S::Response
|
||||
where
|
||||
S: Service<Request = Request, Response = ServiceResponse<B>, Error = E>,
|
||||
E: std::fmt::Debug
|
||||
E: std::fmt::Debug,
|
||||
{
|
||||
let req = self.to_request();
|
||||
call_service(app, req).await
|
||||
@ -1125,8 +1145,8 @@ mod tests {
|
||||
#[actix_rt::test]
|
||||
async fn test_response_json() {
|
||||
let mut app = init_service(App::new().service(web::resource("/people").route(
|
||||
web::post().to(|person: web::Json<Person>| {
|
||||
async { HttpResponse::Ok().json(person.into_inner()) }
|
||||
web::post().to(|person: web::Json<Person>| async {
|
||||
HttpResponse::Ok().json(person.into_inner())
|
||||
}),
|
||||
)))
|
||||
.await;
|
||||
@ -1146,8 +1166,8 @@ mod tests {
|
||||
#[actix_rt::test]
|
||||
async fn test_body_json() {
|
||||
let mut app = init_service(App::new().service(web::resource("/people").route(
|
||||
web::post().to(|person: web::Json<Person>| {
|
||||
async { HttpResponse::Ok().json(person.into_inner()) }
|
||||
web::post().to(|person: web::Json<Person>| async {
|
||||
HttpResponse::Ok().json(person.into_inner())
|
||||
}),
|
||||
)))
|
||||
.await;
|
||||
@ -1168,8 +1188,8 @@ mod tests {
|
||||
#[actix_rt::test]
|
||||
async fn test_request_response_form() {
|
||||
let mut app = init_service(App::new().service(web::resource("/people").route(
|
||||
web::post().to(|person: web::Form<Person>| {
|
||||
async { HttpResponse::Ok().json(person.into_inner()) }
|
||||
web::post().to(|person: web::Form<Person>| async {
|
||||
HttpResponse::Ok().json(person.into_inner())
|
||||
}),
|
||||
)))
|
||||
.await;
|
||||
@ -1194,8 +1214,8 @@ mod tests {
|
||||
#[actix_rt::test]
|
||||
async fn test_request_response_json() {
|
||||
let mut app = init_service(App::new().service(web::resource("/people").route(
|
||||
web::post().to(|person: web::Json<Person>| {
|
||||
async { HttpResponse::Ok().json(person.into_inner()) }
|
||||
web::post().to(|person: web::Json<Person>| async {
|
||||
HttpResponse::Ok().json(person.into_inner())
|
||||
}),
|
||||
)))
|
||||
.await;
|
||||
@ -1259,7 +1279,7 @@ mod tests {
|
||||
assert!(res.status().is_success());
|
||||
}
|
||||
|
||||
/*
|
||||
/*
|
||||
|
||||
Comment out until actix decoupled of actix-http:
|
||||
https://github.com/actix/actix/issues/321
|
||||
@ -1307,5 +1327,5 @@ mod tests {
|
||||
let res = app.call(req).await.unwrap();
|
||||
assert!(res.status().is_success());
|
||||
}
|
||||
*/
|
||||
*/
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user