mirror of
https://github.com/fafhrd91/actix-net
synced 2025-01-18 13:01:49 +01:00
service trait takes shared self reference (#247)
This commit is contained in:
parent
874e5f2e50
commit
636cef8868
@ -1,6 +1,9 @@
|
||||
# Changes
|
||||
|
||||
## Unreleased - 2021-xx-xx
|
||||
* Add `Router::recognize_checked` [#247]
|
||||
|
||||
[#247]: https://github.com/actix/actix-net/pull/247
|
||||
|
||||
|
||||
## 0.2.6 - 2021-01-09
|
||||
|
@ -45,6 +45,24 @@ impl<T, U> Router<T, U> {
|
||||
None
|
||||
}
|
||||
|
||||
pub fn recognize_checked<R, P, F>(
|
||||
&self,
|
||||
resource: &mut R,
|
||||
check: F,
|
||||
) -> Option<(&T, ResourceId)>
|
||||
where
|
||||
F: Fn(&R, &Option<U>) -> bool,
|
||||
R: Resource<P>,
|
||||
P: ResourcePath,
|
||||
{
|
||||
for item in self.0.iter() {
|
||||
if item.0.match_path_checked(resource, &check, &item.2) {
|
||||
return Some((&item.1, ResourceId(item.0.id())));
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub fn recognize_mut_checked<R, P, F>(
|
||||
&mut self,
|
||||
resource: &mut R,
|
||||
|
@ -36,6 +36,7 @@ slab = "0.4"
|
||||
tokio = { version = "1", features = ["sync"] }
|
||||
|
||||
[dev-dependencies]
|
||||
actix-rt = "2.0.0-beta.2"
|
||||
bytes = "1"
|
||||
env_logger = "0.8"
|
||||
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
|
||||
|
@ -58,11 +58,11 @@ where
|
||||
type Error = ();
|
||||
type Future = Ready<Result<(), ()>>;
|
||||
|
||||
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.service.poll_ready(ctx).map_err(|_| ())
|
||||
}
|
||||
|
||||
fn call(&mut self, (guard, req): (Option<CounterGuard>, MioStream)) -> Self::Future {
|
||||
fn call(&self, (guard, req): (Option<CounterGuard>, MioStream)) -> Self::Future {
|
||||
ready(match FromStream::from_mio(req) {
|
||||
Ok(stream) => {
|
||||
let f = self.service.call(stream);
|
||||
|
@ -1,6 +1,12 @@
|
||||
# Changes
|
||||
|
||||
## Unreleased - 2021-xx-xx
|
||||
* `Service::poll_ready` and `Service::call` take `&self`. [#247]
|
||||
* `apply_fn` and `apply_fn_factory` would take `Fn(Req, &Service)` function type [#247]
|
||||
* `apply_cfg` and `apply_cfg_factory` would take `Fn(Req, &Service)` function type [#247]
|
||||
* `fn_service` module would take `Fn(Req)` function type. [#247]
|
||||
|
||||
[#247]: https://github.com/actix/actix-net/pull/247
|
||||
|
||||
|
||||
## 2.0.0-beta.3 - 2021-01-09
|
||||
|
@ -1,12 +1,12 @@
|
||||
use alloc::rc::Rc;
|
||||
use core::{
|
||||
cell::RefCell,
|
||||
future::Future,
|
||||
marker::PhantomData,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use alloc::rc::Rc;
|
||||
use futures_core::ready;
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
use super::{Service, ServiceFactory};
|
||||
@ -15,7 +15,7 @@ use super::{Service, ServiceFactory};
|
||||
/// of another service which completes successfully.
|
||||
///
|
||||
/// This is created by the `Pipeline::and_then` method.
|
||||
pub(crate) struct AndThenService<A, B, Req>(Rc<RefCell<(A, B)>>, PhantomData<Req>);
|
||||
pub(crate) struct AndThenService<A, B, Req>(Rc<(A, B)>, PhantomData<Req>);
|
||||
|
||||
impl<A, B, Req> AndThenService<A, B, Req> {
|
||||
/// Create new `AndThen` combinator
|
||||
@ -24,7 +24,7 @@ impl<A, B, Req> AndThenService<A, B, Req> {
|
||||
A: Service<Req>,
|
||||
B: Service<A::Response, Error = A::Error>,
|
||||
{
|
||||
Self(Rc::new(RefCell::new((a, b))), PhantomData)
|
||||
Self(Rc::new((a, b)), PhantomData)
|
||||
}
|
||||
}
|
||||
|
||||
@ -43,20 +43,20 @@ where
|
||||
type Error = A::Error;
|
||||
type Future = AndThenServiceResponse<A, B, Req>;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
let mut srv = self.0.borrow_mut();
|
||||
let not_ready = !srv.0.poll_ready(cx)?.is_ready();
|
||||
if !srv.1.poll_ready(cx)?.is_ready() || not_ready {
|
||||
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
let (a, b) = &*self.0;
|
||||
let not_ready = !a.poll_ready(cx)?.is_ready();
|
||||
if !b.poll_ready(cx)?.is_ready() || not_ready {
|
||||
Poll::Pending
|
||||
} else {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Req) -> Self::Future {
|
||||
fn call(&self, req: Req) -> Self::Future {
|
||||
AndThenServiceResponse {
|
||||
state: State::A {
|
||||
fut: self.0.borrow_mut().0.call(req),
|
||||
fut: self.0 .0.call(req),
|
||||
b: Some(self.0.clone()),
|
||||
},
|
||||
}
|
||||
@ -84,13 +84,12 @@ pin_project! {
|
||||
A {
|
||||
#[pin]
|
||||
fut: A::Future,
|
||||
b: Option<Rc<RefCell<(A, B)>>>,
|
||||
b: Option<Rc<(A, B)>>,
|
||||
},
|
||||
B {
|
||||
#[pin]
|
||||
fut: B::Future,
|
||||
},
|
||||
Empty,
|
||||
}
|
||||
}
|
||||
|
||||
@ -105,23 +104,14 @@ where
|
||||
let mut this = self.as_mut().project();
|
||||
|
||||
match this.state.as_mut().project() {
|
||||
StateProj::A { fut, b } => match fut.poll(cx)? {
|
||||
Poll::Ready(res) => {
|
||||
let b = b.take().unwrap();
|
||||
this.state.set(State::Empty); // drop fut A
|
||||
let fut = b.borrow_mut().1.call(res);
|
||||
this.state.set(State::B { fut });
|
||||
self.poll(cx)
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
},
|
||||
StateProj::B { fut } => fut.poll(cx).map(|r| {
|
||||
this.state.set(State::Empty);
|
||||
r
|
||||
}),
|
||||
StateProj::Empty => {
|
||||
panic!("future must not be polled after it returned `Poll::Ready`")
|
||||
StateProj::A { fut, b } => {
|
||||
let res = ready!(fut.poll(cx))?;
|
||||
let b = b.take().unwrap();
|
||||
let fut = b.1.call(res);
|
||||
this.state.set(State::B { fut });
|
||||
self.poll(cx)
|
||||
}
|
||||
StateProj::B { fut } => fut.poll(cx),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -292,12 +282,12 @@ mod tests {
|
||||
type Error = ();
|
||||
type Future = Ready<Result<Self::Response, ()>>;
|
||||
|
||||
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.0.set(self.0.get() + 1);
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, req: &'static str) -> Self::Future {
|
||||
fn call(&self, req: &'static str) -> Self::Future {
|
||||
ok(req)
|
||||
}
|
||||
}
|
||||
@ -310,12 +300,12 @@ mod tests {
|
||||
type Error = ();
|
||||
type Future = Ready<Result<Self::Response, ()>>;
|
||||
|
||||
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.0.set(self.0.get() + 1);
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, req: &'static str) -> Self::Future {
|
||||
fn call(&self, req: &'static str) -> Self::Future {
|
||||
ok((req, "srv2"))
|
||||
}
|
||||
}
|
||||
@ -323,7 +313,7 @@ mod tests {
|
||||
#[actix_rt::test]
|
||||
async fn test_poll_ready() {
|
||||
let cnt = Rc::new(Cell::new(0));
|
||||
let mut srv = pipeline(Srv1(cnt.clone())).and_then(Srv2(cnt.clone()));
|
||||
let srv = pipeline(Srv1(cnt.clone())).and_then(Srv2(cnt.clone()));
|
||||
let res = lazy(|cx| srv.poll_ready(cx)).await;
|
||||
assert_eq!(res, Poll::Ready(Ok(())));
|
||||
assert_eq!(cnt.get(), 2);
|
||||
@ -332,7 +322,7 @@ mod tests {
|
||||
#[actix_rt::test]
|
||||
async fn test_call() {
|
||||
let cnt = Rc::new(Cell::new(0));
|
||||
let mut srv = pipeline(Srv1(cnt.clone())).and_then(Srv2(cnt));
|
||||
let srv = pipeline(Srv1(cnt.clone())).and_then(Srv2(cnt));
|
||||
let res = srv.call("srv1").await;
|
||||
assert!(res.is_ok());
|
||||
assert_eq!(res.unwrap(), ("srv1", "srv2"));
|
||||
@ -346,7 +336,7 @@ mod tests {
|
||||
pipeline_factory(fn_factory(move || ready(Ok::<_, ()>(Srv1(cnt2.clone())))))
|
||||
.and_then(move || ready(Ok(Srv2(cnt.clone()))));
|
||||
|
||||
let mut srv = new_srv.new_service(()).await.unwrap();
|
||||
let srv = new_srv.new_service(()).await.unwrap();
|
||||
let res = srv.call("srv1").await;
|
||||
assert!(res.is_ok());
|
||||
assert_eq!(res.unwrap(), ("srv1", "srv2"));
|
||||
|
@ -20,7 +20,7 @@ pub fn apply_fn<I, S, F, Fut, Req, In, Res, Err>(
|
||||
where
|
||||
I: IntoService<S, In>,
|
||||
S: Service<In, Error = Err>,
|
||||
F: FnMut(Req, &mut S) -> Fut,
|
||||
F: Fn(Req, &S) -> Fut,
|
||||
Fut: Future<Output = Result<Res, Err>>,
|
||||
{
|
||||
Apply::new(service.into_service(), wrap_fn)
|
||||
@ -36,7 +36,7 @@ pub fn apply_fn_factory<I, SF, F, Fut, Req, In, Res, Err>(
|
||||
where
|
||||
I: IntoServiceFactory<SF, In>,
|
||||
SF: ServiceFactory<In, Error = Err>,
|
||||
F: FnMut(Req, &mut SF::Service) -> Fut + Clone,
|
||||
F: Fn(Req, &SF::Service) -> Fut + Clone,
|
||||
Fut: Future<Output = Result<Res, Err>>,
|
||||
{
|
||||
ApplyFactory::new(service.into_factory(), f)
|
||||
@ -57,7 +57,7 @@ where
|
||||
impl<S, F, Fut, Req, In, Res, Err> Apply<S, F, Req, In, Res, Err>
|
||||
where
|
||||
S: Service<In, Error = Err>,
|
||||
F: FnMut(Req, &mut S) -> Fut,
|
||||
F: Fn(Req, &S) -> Fut,
|
||||
Fut: Future<Output = Result<Res, Err>>,
|
||||
{
|
||||
/// Create new `Apply` combinator
|
||||
@ -73,7 +73,7 @@ where
|
||||
impl<S, F, Fut, Req, In, Res, Err> Clone for Apply<S, F, Req, In, Res, Err>
|
||||
where
|
||||
S: Service<In, Error = Err> + Clone,
|
||||
F: FnMut(Req, &mut S) -> Fut + Clone,
|
||||
F: Fn(Req, &S) -> Fut + Clone,
|
||||
Fut: Future<Output = Result<Res, Err>>,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
@ -88,7 +88,7 @@ where
|
||||
impl<S, F, Fut, Req, In, Res, Err> Service<Req> for Apply<S, F, Req, In, Res, Err>
|
||||
where
|
||||
S: Service<In, Error = Err>,
|
||||
F: FnMut(Req, &mut S) -> Fut,
|
||||
F: Fn(Req, &S) -> Fut,
|
||||
Fut: Future<Output = Result<Res, Err>>,
|
||||
{
|
||||
type Response = Res;
|
||||
@ -97,8 +97,8 @@ where
|
||||
|
||||
crate::forward_ready!(service);
|
||||
|
||||
fn call(&mut self, req: Req) -> Self::Future {
|
||||
(self.wrap_fn)(req, &mut self.service)
|
||||
fn call(&self, req: Req) -> Self::Future {
|
||||
(self.wrap_fn)(req, &self.service)
|
||||
}
|
||||
}
|
||||
|
||||
@ -112,7 +112,7 @@ pub struct ApplyFactory<SF, F, Req, In, Res, Err> {
|
||||
impl<SF, F, Fut, Req, In, Res, Err> ApplyFactory<SF, F, Req, In, Res, Err>
|
||||
where
|
||||
SF: ServiceFactory<In, Error = Err>,
|
||||
F: FnMut(Req, &mut SF::Service) -> Fut + Clone,
|
||||
F: Fn(Req, &SF::Service) -> Fut + Clone,
|
||||
Fut: Future<Output = Result<Res, Err>>,
|
||||
{
|
||||
/// Create new `ApplyFactory` new service instance
|
||||
@ -128,7 +128,7 @@ where
|
||||
impl<SF, F, Fut, Req, In, Res, Err> Clone for ApplyFactory<SF, F, Req, In, Res, Err>
|
||||
where
|
||||
SF: ServiceFactory<In, Error = Err> + Clone,
|
||||
F: FnMut(Req, &mut SF::Service) -> Fut + Clone,
|
||||
F: Fn(Req, &SF::Service) -> Fut + Clone,
|
||||
Fut: Future<Output = Result<Res, Err>>,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
@ -144,7 +144,7 @@ impl<SF, F, Fut, Req, In, Res, Err> ServiceFactory<Req>
|
||||
for ApplyFactory<SF, F, Req, In, Res, Err>
|
||||
where
|
||||
SF: ServiceFactory<In, Error = Err>,
|
||||
F: FnMut(Req, &mut SF::Service) -> Fut + Clone,
|
||||
F: Fn(Req, &SF::Service) -> Fut + Clone,
|
||||
Fut: Future<Output = Result<Res, Err>>,
|
||||
{
|
||||
type Response = Res;
|
||||
@ -165,7 +165,7 @@ pin_project! {
|
||||
pub struct ApplyServiceFactoryResponse<SF, F, Fut, Req, In, Res, Err>
|
||||
where
|
||||
SF: ServiceFactory<In, Error = Err>,
|
||||
F: FnMut(Req, &mut SF::Service) -> Fut,
|
||||
F: Fn(Req, &SF::Service) -> Fut,
|
||||
Fut: Future<Output = Result<Res, Err>>,
|
||||
{
|
||||
#[pin]
|
||||
@ -178,7 +178,7 @@ pin_project! {
|
||||
impl<SF, F, Fut, Req, In, Res, Err> ApplyServiceFactoryResponse<SF, F, Fut, Req, In, Res, Err>
|
||||
where
|
||||
SF: ServiceFactory<In, Error = Err>,
|
||||
F: FnMut(Req, &mut SF::Service) -> Fut,
|
||||
F: Fn(Req, &SF::Service) -> Fut,
|
||||
Fut: Future<Output = Result<Res, Err>>,
|
||||
{
|
||||
fn new(fut: SF::Future, wrap_fn: F) -> Self {
|
||||
@ -194,7 +194,7 @@ impl<SF, F, Fut, Req, In, Res, Err> Future
|
||||
for ApplyServiceFactoryResponse<SF, F, Fut, Req, In, Res, Err>
|
||||
where
|
||||
SF: ServiceFactory<In, Error = Err>,
|
||||
F: FnMut(Req, &mut SF::Service) -> Fut,
|
||||
F: Fn(Req, &SF::Service) -> Fut,
|
||||
Fut: Future<Output = Result<Res, Err>>,
|
||||
{
|
||||
type Output = Result<Apply<SF::Service, F, Req, In, Res, Err>, SF::InitError>;
|
||||
@ -226,14 +226,14 @@ mod tests {
|
||||
|
||||
crate::always_ready!();
|
||||
|
||||
fn call(&mut self, _: ()) -> Self::Future {
|
||||
fn call(&self, _: ()) -> Self::Future {
|
||||
ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_call() {
|
||||
let mut srv = pipeline(apply_fn(Srv, |req: &'static str, srv| {
|
||||
let srv = pipeline(apply_fn(Srv, |req: &'static str, srv| {
|
||||
let fut = srv.call(());
|
||||
async move {
|
||||
fut.await.unwrap();
|
||||
@ -261,7 +261,7 @@ mod tests {
|
||||
},
|
||||
));
|
||||
|
||||
let mut srv = new_srv.new_service(()).await.unwrap();
|
||||
let srv = new_srv.new_service(()).await.unwrap();
|
||||
|
||||
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
|
||||
|
||||
|
@ -1,17 +1,17 @@
|
||||
use alloc::rc::Rc;
|
||||
use core::{
|
||||
cell::RefCell,
|
||||
future::Future,
|
||||
marker::PhantomData,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use alloc::rc::Rc;
|
||||
use futures_core::ready;
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
use crate::{Service, ServiceFactory};
|
||||
|
||||
/// Convert `Fn(Config, &mut Service1) -> Future<Service2>` fn to a service factory.
|
||||
/// Convert `Fn(Config, &Service1) -> Future<Service2>` fn to a service factory.
|
||||
pub fn apply_cfg<S1, Req, F, Cfg, Fut, S2, Err>(
|
||||
srv: S1,
|
||||
f: F,
|
||||
@ -26,17 +26,17 @@ pub fn apply_cfg<S1, Req, F, Cfg, Fut, S2, Err>(
|
||||
> + Clone
|
||||
where
|
||||
S1: Service<Req>,
|
||||
F: FnMut(Cfg, &mut S1) -> Fut,
|
||||
F: Fn(Cfg, &S1) -> Fut,
|
||||
Fut: Future<Output = Result<S2, Err>>,
|
||||
S2: Service<Req>,
|
||||
{
|
||||
ApplyConfigService {
|
||||
srv: Rc::new(RefCell::new((srv, f))),
|
||||
srv: Rc::new((srv, f)),
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert `Fn(Config, &mut ServiceFactory1) -> Future<ServiceFactory2>` fn to a service factory.
|
||||
/// Convert `Fn(Config, &ServiceFactory1) -> Future<ServiceFactory2>` fn to a service factory.
|
||||
///
|
||||
/// Service1 get constructed from `T` factory.
|
||||
pub fn apply_cfg_factory<SF, Req, F, Cfg, Fut, S>(
|
||||
@ -52,33 +52,33 @@ pub fn apply_cfg_factory<SF, Req, F, Cfg, Fut, S>(
|
||||
> + Clone
|
||||
where
|
||||
SF: ServiceFactory<Req, Config = ()>,
|
||||
F: FnMut(Cfg, &mut SF::Service) -> Fut,
|
||||
F: Fn(Cfg, &SF::Service) -> Fut,
|
||||
SF::InitError: From<SF::Error>,
|
||||
Fut: Future<Output = Result<S, SF::InitError>>,
|
||||
S: Service<Req>,
|
||||
{
|
||||
ApplyConfigServiceFactory {
|
||||
srv: Rc::new(RefCell::new((factory, f))),
|
||||
srv: Rc::new((factory, f)),
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert `Fn(Config, &mut Server) -> Future<Service>` fn to NewService\
|
||||
/// Convert `Fn(Config, &Server) -> Future<Service>` fn to NewService\
|
||||
struct ApplyConfigService<S1, Req, F, Cfg, Fut, S2, Err>
|
||||
where
|
||||
S1: Service<Req>,
|
||||
F: FnMut(Cfg, &mut S1) -> Fut,
|
||||
F: Fn(Cfg, &S1) -> Fut,
|
||||
Fut: Future<Output = Result<S2, Err>>,
|
||||
S2: Service<Req>,
|
||||
{
|
||||
srv: Rc<RefCell<(S1, F)>>,
|
||||
srv: Rc<(S1, F)>,
|
||||
_phantom: PhantomData<(Cfg, Req, Fut, S2)>,
|
||||
}
|
||||
|
||||
impl<S1, Req, F, Cfg, Fut, S2, Err> Clone for ApplyConfigService<S1, Req, F, Cfg, Fut, S2, Err>
|
||||
where
|
||||
S1: Service<Req>,
|
||||
F: FnMut(Cfg, &mut S1) -> Fut,
|
||||
F: Fn(Cfg, &S1) -> Fut,
|
||||
Fut: Future<Output = Result<S2, Err>>,
|
||||
S2: Service<Req>,
|
||||
{
|
||||
@ -94,20 +94,20 @@ impl<S1, Req, F, Cfg, Fut, S2, Err> ServiceFactory<Req>
|
||||
for ApplyConfigService<S1, Req, F, Cfg, Fut, S2, Err>
|
||||
where
|
||||
S1: Service<Req>,
|
||||
F: FnMut(Cfg, &mut S1) -> Fut,
|
||||
F: Fn(Cfg, &S1) -> Fut,
|
||||
Fut: Future<Output = Result<S2, Err>>,
|
||||
S2: Service<Req>,
|
||||
{
|
||||
type Config = Cfg;
|
||||
type Response = S2::Response;
|
||||
type Error = S2::Error;
|
||||
type Config = Cfg;
|
||||
type Service = S2;
|
||||
|
||||
type InitError = Err;
|
||||
type Future = Fut;
|
||||
|
||||
fn new_service(&self, cfg: Cfg) -> Self::Future {
|
||||
let (t, f) = &mut *self.srv.borrow_mut();
|
||||
let (t, f) = &*self.srv;
|
||||
f(cfg, t)
|
||||
}
|
||||
}
|
||||
@ -116,18 +116,18 @@ where
|
||||
struct ApplyConfigServiceFactory<SF, Req, F, Cfg, Fut, S>
|
||||
where
|
||||
SF: ServiceFactory<Req, Config = ()>,
|
||||
F: FnMut(Cfg, &mut SF::Service) -> Fut,
|
||||
F: Fn(Cfg, &SF::Service) -> Fut,
|
||||
Fut: Future<Output = Result<S, SF::InitError>>,
|
||||
S: Service<Req>,
|
||||
{
|
||||
srv: Rc<RefCell<(SF, F)>>,
|
||||
srv: Rc<(SF, F)>,
|
||||
_phantom: PhantomData<(Cfg, Req, Fut, S)>,
|
||||
}
|
||||
|
||||
impl<SF, Req, F, Cfg, Fut, S> Clone for ApplyConfigServiceFactory<SF, Req, F, Cfg, Fut, S>
|
||||
where
|
||||
SF: ServiceFactory<Req, Config = ()>,
|
||||
F: FnMut(Cfg, &mut SF::Service) -> Fut,
|
||||
F: Fn(Cfg, &SF::Service) -> Fut,
|
||||
Fut: Future<Output = Result<S, SF::InitError>>,
|
||||
S: Service<Req>,
|
||||
{
|
||||
@ -144,13 +144,13 @@ impl<SF, Req, F, Cfg, Fut, S> ServiceFactory<Req>
|
||||
where
|
||||
SF: ServiceFactory<Req, Config = ()>,
|
||||
SF::InitError: From<SF::Error>,
|
||||
F: FnMut(Cfg, &mut SF::Service) -> Fut,
|
||||
F: Fn(Cfg, &SF::Service) -> Fut,
|
||||
Fut: Future<Output = Result<S, SF::InitError>>,
|
||||
S: Service<Req>,
|
||||
{
|
||||
type Config = Cfg;
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Config = Cfg;
|
||||
type Service = S;
|
||||
|
||||
type InitError = SF::InitError;
|
||||
@ -161,7 +161,7 @@ where
|
||||
cfg: Some(cfg),
|
||||
store: self.srv.clone(),
|
||||
state: State::A {
|
||||
fut: self.srv.borrow().0.new_service(()),
|
||||
fut: self.srv.0.new_service(()),
|
||||
},
|
||||
}
|
||||
}
|
||||
@ -172,12 +172,12 @@ pin_project! {
|
||||
where
|
||||
SF: ServiceFactory<Req, Config = ()>,
|
||||
SF::InitError: From<SF::Error>,
|
||||
F: FnMut(Cfg, &mut SF::Service) -> Fut,
|
||||
F: Fn(Cfg, &SF::Service) -> Fut,
|
||||
Fut: Future<Output = Result<S, SF::InitError>>,
|
||||
S: Service<Req>,
|
||||
{
|
||||
cfg: Option<Cfg>,
|
||||
store: Rc<RefCell<(SF, F)>>,
|
||||
store: Rc<(SF, F)>,
|
||||
#[pin]
|
||||
state: State<SF, Fut, S, Req>,
|
||||
}
|
||||
@ -203,7 +203,7 @@ impl<SF, Req, F, Cfg, Fut, S> Future
|
||||
where
|
||||
SF: ServiceFactory<Req, Config = ()>,
|
||||
SF::InitError: From<SF::Error>,
|
||||
F: FnMut(Cfg, &mut SF::Service) -> Fut,
|
||||
F: Fn(Cfg, &SF::Service) -> Fut,
|
||||
Fut: Future<Output = Result<S, SF::InitError>>,
|
||||
S: Service<Req>,
|
||||
{
|
||||
@ -213,24 +213,20 @@ where
|
||||
let mut this = self.as_mut().project();
|
||||
|
||||
match this.state.as_mut().project() {
|
||||
StateProj::A { fut } => match fut.poll(cx)? {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(svc) => {
|
||||
this.state.set(State::B { svc });
|
||||
self.poll(cx)
|
||||
StateProj::A { fut } => {
|
||||
let svc = ready!(fut.poll(cx))?;
|
||||
this.state.set(State::B { svc });
|
||||
self.poll(cx)
|
||||
}
|
||||
StateProj::B { svc } => {
|
||||
ready!(svc.poll_ready(cx))?;
|
||||
{
|
||||
let (_, f) = &**this.store;
|
||||
let fut = f(this.cfg.take().unwrap(), svc);
|
||||
this.state.set(State::C { fut });
|
||||
}
|
||||
},
|
||||
StateProj::B { svc } => match svc.poll_ready(cx)? {
|
||||
Poll::Ready(_) => {
|
||||
{
|
||||
let (_, f) = &mut *this.store.borrow_mut();
|
||||
let fut = f(this.cfg.take().unwrap(), svc);
|
||||
this.state.set(State::C { fut });
|
||||
}
|
||||
self.poll(cx)
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
},
|
||||
self.poll(cx)
|
||||
}
|
||||
StateProj::C { fut } => fut.poll(cx),
|
||||
}
|
||||
}
|
||||
|
@ -131,11 +131,11 @@ where
|
||||
type Error = Err;
|
||||
type Future = BoxFuture<Result<Res, Err>>;
|
||||
|
||||
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.0.poll_ready(ctx)
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Req) -> Self::Future {
|
||||
fn call(&self, req: Req) -> Self::Future {
|
||||
Box::pin(self.0.call(req))
|
||||
}
|
||||
}
|
||||
|
@ -7,7 +7,7 @@ pub fn fn_service<F, Fut, Req, Res, Err, Cfg>(
|
||||
f: F,
|
||||
) -> FnServiceFactory<F, Fut, Req, Res, Err, Cfg>
|
||||
where
|
||||
F: FnMut(Req) -> Fut + Clone,
|
||||
F: Fn(Req) -> Fut + Clone,
|
||||
Fut: Future<Output = Result<Res, Err>>,
|
||||
{
|
||||
FnServiceFactory::new(f)
|
||||
@ -39,7 +39,7 @@ where
|
||||
/// });
|
||||
///
|
||||
/// // construct new service
|
||||
/// let mut srv = factory.new_service(()).await?;
|
||||
/// let srv = factory.new_service(()).await?;
|
||||
///
|
||||
/// // now we can use `div` service
|
||||
/// let result = srv.call((10, 20)).await?;
|
||||
@ -81,7 +81,7 @@ where
|
||||
/// });
|
||||
///
|
||||
/// // construct new service with config argument
|
||||
/// let mut srv = factory.new_service(10).await?;
|
||||
/// let srv = factory.new_service(10).await?;
|
||||
///
|
||||
/// let result = srv.call(10).await?;
|
||||
/// assert_eq!(result, 100);
|
||||
@ -132,7 +132,7 @@ where
|
||||
|
||||
impl<F, Fut, Req, Res, Err> Service<Req> for FnService<F, Fut, Req, Res, Err>
|
||||
where
|
||||
F: FnMut(Req) -> Fut,
|
||||
F: Fn(Req) -> Fut,
|
||||
Fut: Future<Output = Result<Res, Err>>,
|
||||
{
|
||||
type Response = Res;
|
||||
@ -141,14 +141,14 @@ where
|
||||
|
||||
crate::always_ready!();
|
||||
|
||||
fn call(&mut self, req: Req) -> Self::Future {
|
||||
fn call(&self, req: Req) -> Self::Future {
|
||||
(self.f)(req)
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, Fut, Req, Res, Err> IntoService<FnService<F, Fut, Req, Res, Err>, Req> for F
|
||||
where
|
||||
F: FnMut(Req) -> Fut,
|
||||
F: Fn(Req) -> Fut,
|
||||
Fut: Future<Output = Result<Res, Err>>,
|
||||
{
|
||||
fn into_service(self) -> FnService<F, Fut, Req, Res, Err> {
|
||||
@ -158,7 +158,7 @@ where
|
||||
|
||||
pub struct FnServiceFactory<F, Fut, Req, Res, Err, Cfg>
|
||||
where
|
||||
F: FnMut(Req) -> Fut,
|
||||
F: Fn(Req) -> Fut,
|
||||
Fut: Future<Output = Result<Res, Err>>,
|
||||
{
|
||||
f: F,
|
||||
@ -167,7 +167,7 @@ where
|
||||
|
||||
impl<F, Fut, Req, Res, Err, Cfg> FnServiceFactory<F, Fut, Req, Res, Err, Cfg>
|
||||
where
|
||||
F: FnMut(Req) -> Fut + Clone,
|
||||
F: Fn(Req) -> Fut + Clone,
|
||||
Fut: Future<Output = Result<Res, Err>>,
|
||||
{
|
||||
fn new(f: F) -> Self {
|
||||
@ -177,7 +177,7 @@ where
|
||||
|
||||
impl<F, Fut, Req, Res, Err, Cfg> Clone for FnServiceFactory<F, Fut, Req, Res, Err, Cfg>
|
||||
where
|
||||
F: FnMut(Req) -> Fut + Clone,
|
||||
F: Fn(Req) -> Fut + Clone,
|
||||
Fut: Future<Output = Result<Res, Err>>,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
@ -187,7 +187,7 @@ where
|
||||
|
||||
impl<F, Fut, Req, Res, Err> Service<Req> for FnServiceFactory<F, Fut, Req, Res, Err, ()>
|
||||
where
|
||||
F: FnMut(Req) -> Fut + Clone,
|
||||
F: Fn(Req) -> Fut + Clone,
|
||||
Fut: Future<Output = Result<Res, Err>>,
|
||||
{
|
||||
type Response = Res;
|
||||
@ -196,7 +196,7 @@ where
|
||||
|
||||
crate::always_ready!();
|
||||
|
||||
fn call(&mut self, req: Req) -> Self::Future {
|
||||
fn call(&self, req: Req) -> Self::Future {
|
||||
(self.f)(req)
|
||||
}
|
||||
}
|
||||
@ -204,7 +204,7 @@ where
|
||||
impl<F, Fut, Req, Res, Err, Cfg> ServiceFactory<Req>
|
||||
for FnServiceFactory<F, Fut, Req, Res, Err, Cfg>
|
||||
where
|
||||
F: FnMut(Req) -> Fut + Clone,
|
||||
F: Fn(Req) -> Fut + Clone,
|
||||
Fut: Future<Output = Result<Res, Err>>,
|
||||
{
|
||||
type Response = Res;
|
||||
@ -318,8 +318,8 @@ where
|
||||
{
|
||||
type Response = Srv::Response;
|
||||
type Error = Srv::Error;
|
||||
type Service = Srv;
|
||||
type Config = Cfg;
|
||||
type Service = Srv;
|
||||
type InitError = Err;
|
||||
type Future = Fut;
|
||||
|
||||
@ -364,7 +364,7 @@ mod tests {
|
||||
async fn test_fn_service() {
|
||||
let new_srv = fn_service(|()| ok::<_, ()>("srv"));
|
||||
|
||||
let mut srv = new_srv.new_service(()).await.unwrap();
|
||||
let srv = new_srv.new_service(()).await.unwrap();
|
||||
let res = srv.call(()).await;
|
||||
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
|
||||
assert!(res.is_ok());
|
||||
@ -373,7 +373,7 @@ mod tests {
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_fn_service_service() {
|
||||
let mut srv = fn_service(|()| ok::<_, ()>("srv"));
|
||||
let srv = fn_service(|()| ok::<_, ()>("srv"));
|
||||
|
||||
let res = srv.call(()).await;
|
||||
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
|
||||
@ -387,7 +387,7 @@ mod tests {
|
||||
ok::<_, ()>(fn_service(move |()| ok::<_, ()>(("srv", cfg))))
|
||||
});
|
||||
|
||||
let mut srv = new_srv.new_service(1).await.unwrap();
|
||||
let srv = new_srv.new_service(1).await.unwrap();
|
||||
let res = srv.call(()).await;
|
||||
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
|
||||
assert!(res.is_ok());
|
||||
|
@ -69,9 +69,9 @@ use self::ready::{err, ok, ready, Ready};
|
||||
/// type Error = MyError;
|
||||
/// type Future = Pin<Box<Future<Output=Result<Self::Response, Self::Error>>>>;
|
||||
///
|
||||
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { ... }
|
||||
/// fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { ... }
|
||||
///
|
||||
/// fn call(&mut self, req: Self::Request) -> Self::Future { ... }
|
||||
/// fn call(&self, req: Self::Request) -> Self::Future { ... }
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
@ -104,7 +104,7 @@ pub trait Service<Req> {
|
||||
/// # Notes
|
||||
/// 1. `.poll_ready()` might be called on different task from actual service call.
|
||||
/// 1. In case of chained services, `.poll_ready()` get called for all services at once.
|
||||
fn poll_ready(&mut self, ctx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>>;
|
||||
fn poll_ready(&self, ctx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>>;
|
||||
|
||||
/// Process the request and return the response asynchronously.
|
||||
///
|
||||
@ -115,7 +115,7 @@ pub trait Service<Req> {
|
||||
///
|
||||
/// Calling `call` without calling `poll_ready` is permitted. The
|
||||
/// implementation must be resilient to this fact.
|
||||
fn call(&mut self, req: Req) -> Self::Future;
|
||||
fn call(&self, req: Req) -> Self::Future;
|
||||
}
|
||||
|
||||
/// Factory for creating `Service`s.
|
||||
@ -158,11 +158,11 @@ where
|
||||
type Error = S::Error;
|
||||
type Future = S::Future;
|
||||
|
||||
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
(**self).poll_ready(ctx)
|
||||
}
|
||||
|
||||
fn call(&mut self, request: Req) -> S::Future {
|
||||
fn call(&self, request: Req) -> S::Future {
|
||||
(**self).call(request)
|
||||
}
|
||||
}
|
||||
@ -175,11 +175,11 @@ where
|
||||
type Error = S::Error;
|
||||
type Future = S::Future;
|
||||
|
||||
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
|
||||
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
|
||||
(**self).poll_ready(ctx)
|
||||
}
|
||||
|
||||
fn call(&mut self, request: Req) -> S::Future {
|
||||
fn call(&self, request: Req) -> S::Future {
|
||||
(**self).call(request)
|
||||
}
|
||||
}
|
||||
@ -192,12 +192,12 @@ where
|
||||
type Error = S::Error;
|
||||
type Future = S::Future;
|
||||
|
||||
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.borrow_mut().poll_ready(ctx)
|
||||
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.borrow().poll_ready(ctx)
|
||||
}
|
||||
|
||||
fn call(&mut self, request: Req) -> S::Future {
|
||||
self.borrow_mut().call(request)
|
||||
fn call(&self, request: Req) -> S::Future {
|
||||
self.borrow().call(request)
|
||||
}
|
||||
}
|
||||
|
||||
@ -209,12 +209,12 @@ where
|
||||
type Error = S::Error;
|
||||
type Future = S::Future;
|
||||
|
||||
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.borrow_mut().poll_ready(ctx)
|
||||
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.borrow().poll_ready(ctx)
|
||||
}
|
||||
|
||||
fn call(&mut self, request: Req) -> S::Future {
|
||||
(&mut (**self).borrow_mut()).call(request)
|
||||
fn call(&self, request: Req) -> S::Future {
|
||||
self.borrow().call(request)
|
||||
}
|
||||
}
|
||||
|
||||
@ -311,8 +311,9 @@ pub mod dev {
|
||||
#[macro_export]
|
||||
macro_rules! always_ready {
|
||||
() => {
|
||||
#[inline]
|
||||
fn poll_ready(
|
||||
&mut self,
|
||||
&self,
|
||||
_: &mut ::core::task::Context<'_>,
|
||||
) -> ::core::task::Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
@ -323,8 +324,9 @@ macro_rules! always_ready {
|
||||
#[macro_export]
|
||||
macro_rules! forward_ready {
|
||||
($field:ident) => {
|
||||
#[inline]
|
||||
fn poll_ready(
|
||||
&mut self,
|
||||
&self,
|
||||
cx: &mut ::core::task::Context<'_>,
|
||||
) -> ::core::task::Poll<Result<(), Self::Error>> {
|
||||
self.$field
|
||||
|
@ -58,7 +58,7 @@ where
|
||||
|
||||
crate::forward_ready!(service);
|
||||
|
||||
fn call(&mut self, req: Req) -> Self::Future {
|
||||
fn call(&self, req: Req) -> Self::Future {
|
||||
MapFuture::new(self.service.call(req), self.f.clone())
|
||||
}
|
||||
}
|
||||
@ -215,21 +215,21 @@ mod tests {
|
||||
|
||||
crate::always_ready!();
|
||||
|
||||
fn call(&mut self, _: ()) -> Self::Future {
|
||||
fn call(&self, _: ()) -> Self::Future {
|
||||
ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_poll_ready() {
|
||||
let mut srv = Srv.map(|_| "ok");
|
||||
let srv = Srv.map(|_| "ok");
|
||||
let res = lazy(|cx| srv.poll_ready(cx)).await;
|
||||
assert_eq!(res, Poll::Ready(Ok(())));
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_call() {
|
||||
let mut srv = Srv.map(|_| "ok");
|
||||
let srv = Srv.map(|_| "ok");
|
||||
let res = srv.call(()).await;
|
||||
assert!(res.is_ok());
|
||||
assert_eq!(res.unwrap(), "ok");
|
||||
@ -238,7 +238,7 @@ mod tests {
|
||||
#[actix_rt::test]
|
||||
async fn test_new_service() {
|
||||
let new_srv = (|| ok::<_, ()>(Srv)).into_factory().map(|_| "ok");
|
||||
let mut srv = new_srv.new_service(&()).await.unwrap();
|
||||
let srv = new_srv.new_service(&()).await.unwrap();
|
||||
let res = srv.call(()).await;
|
||||
assert!(res.is_ok());
|
||||
assert_eq!(res.unwrap(), ("ok"));
|
||||
|
@ -57,11 +57,11 @@ where
|
||||
type Error = E;
|
||||
type Future = MapErrFuture<A, Req, F, E>;
|
||||
|
||||
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.service.poll_ready(ctx).map_err(&self.f)
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Req) -> Self::Future {
|
||||
fn call(&self, req: Req) -> Self::Future {
|
||||
MapErrFuture::new(self.service.call(req), self.f.clone())
|
||||
}
|
||||
}
|
||||
@ -218,25 +218,25 @@ mod tests {
|
||||
type Error = ();
|
||||
type Future = Ready<Result<(), ()>>;
|
||||
|
||||
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Err(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, _: ()) -> Self::Future {
|
||||
fn call(&self, _: ()) -> Self::Future {
|
||||
err(())
|
||||
}
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_poll_ready() {
|
||||
let mut srv = Srv.map_err(|_| "error");
|
||||
let srv = Srv.map_err(|_| "error");
|
||||
let res = lazy(|cx| srv.poll_ready(cx)).await;
|
||||
assert_eq!(res, Poll::Ready(Err("error")));
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_call() {
|
||||
let mut srv = Srv.map_err(|_| "error");
|
||||
let srv = Srv.map_err(|_| "error");
|
||||
let res = srv.call(()).await;
|
||||
assert!(res.is_err());
|
||||
assert_eq!(res.err().unwrap(), "error");
|
||||
@ -245,7 +245,7 @@ mod tests {
|
||||
#[actix_rt::test]
|
||||
async fn test_new_service() {
|
||||
let new_srv = (|| ok::<_, ()>(Srv)).into_factory().map_err(|_| "error");
|
||||
let mut srv = new_srv.new_service(&()).await.unwrap();
|
||||
let srv = new_srv.new_service(&()).await.unwrap();
|
||||
let res = srv.call(()).await;
|
||||
assert!(res.is_err());
|
||||
assert_eq!(res.err().unwrap(), "error");
|
||||
|
@ -146,12 +146,12 @@ impl<S: Service<Req>, Req> Service<Req> for Pipeline<S, Req> {
|
||||
type Future = S::Future;
|
||||
|
||||
#[inline]
|
||||
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
|
||||
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
|
||||
self.service.poll_ready(ctx)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn call(&mut self, req: Req) -> Self::Future {
|
||||
fn call(&self, req: Req) -> Self::Future {
|
||||
self.service.call(req)
|
||||
}
|
||||
}
|
||||
|
@ -1,12 +1,12 @@
|
||||
use alloc::rc::Rc;
|
||||
use core::{
|
||||
cell::RefCell,
|
||||
future::Future,
|
||||
marker::PhantomData,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use alloc::rc::Rc;
|
||||
use futures_core::ready;
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
use super::{Service, ServiceFactory};
|
||||
@ -15,7 +15,7 @@ use super::{Service, ServiceFactory};
|
||||
/// another service.
|
||||
///
|
||||
/// This is created by the `Pipeline::then` method.
|
||||
pub(crate) struct ThenService<A, B, Req>(Rc<RefCell<(A, B)>>, PhantomData<Req>);
|
||||
pub(crate) struct ThenService<A, B, Req>(Rc<(A, B)>, PhantomData<Req>);
|
||||
|
||||
impl<A, B, Req> ThenService<A, B, Req> {
|
||||
/// Create new `.then()` combinator
|
||||
@ -24,7 +24,7 @@ impl<A, B, Req> ThenService<A, B, Req> {
|
||||
A: Service<Req>,
|
||||
B: Service<Result<A::Response, A::Error>, Error = A::Error>,
|
||||
{
|
||||
Self(Rc::new(RefCell::new((a, b))), PhantomData)
|
||||
Self(Rc::new((a, b)), PhantomData)
|
||||
}
|
||||
}
|
||||
|
||||
@ -43,20 +43,20 @@ where
|
||||
type Error = B::Error;
|
||||
type Future = ThenServiceResponse<A, B, Req>;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
let mut srv = self.0.borrow_mut();
|
||||
let not_ready = !srv.0.poll_ready(cx)?.is_ready();
|
||||
if !srv.1.poll_ready(cx)?.is_ready() || not_ready {
|
||||
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
let (a, b) = &*self.0;
|
||||
let not_ready = !a.poll_ready(cx)?.is_ready();
|
||||
if !b.poll_ready(cx)?.is_ready() || not_ready {
|
||||
Poll::Pending
|
||||
} else {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Req) -> Self::Future {
|
||||
fn call(&self, req: Req) -> Self::Future {
|
||||
ThenServiceResponse {
|
||||
state: State::A {
|
||||
fut: self.0.borrow_mut().0.call(req),
|
||||
fut: self.0 .0.call(req),
|
||||
b: Some(self.0.clone()),
|
||||
},
|
||||
}
|
||||
@ -81,9 +81,8 @@ pin_project! {
|
||||
A: Service<Req>,
|
||||
B: Service<Result<A::Response, A::Error>>,
|
||||
{
|
||||
A { #[pin] fut: A::Future, b: Option<Rc<RefCell<(A, B)>>> },
|
||||
A { #[pin] fut: A::Future, b: Option<Rc<(A, B)>> },
|
||||
B { #[pin] fut: B::Future },
|
||||
Empty,
|
||||
}
|
||||
}
|
||||
|
||||
@ -98,23 +97,14 @@ where
|
||||
let mut this = self.as_mut().project();
|
||||
|
||||
match this.state.as_mut().project() {
|
||||
StateProj::A { fut, b } => match fut.poll(cx) {
|
||||
Poll::Ready(res) => {
|
||||
let b = b.take().unwrap();
|
||||
this.state.set(State::Empty); // drop fut A
|
||||
let fut = b.borrow_mut().1.call(res);
|
||||
this.state.set(State::B { fut });
|
||||
self.poll(cx)
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
},
|
||||
StateProj::B { fut } => fut.poll(cx).map(|r| {
|
||||
this.state.set(State::Empty);
|
||||
r
|
||||
}),
|
||||
StateProj::Empty => {
|
||||
panic!("future must not be polled after it returned `Poll::Ready`")
|
||||
StateProj::A { fut, b } => {
|
||||
let res = ready!(fut.poll(cx));
|
||||
let b = b.take().unwrap();
|
||||
let fut = b.1.call(res);
|
||||
this.state.set(State::B { fut });
|
||||
self.poll(cx)
|
||||
}
|
||||
StateProj::B { fut } => fut.poll(cx),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -266,12 +256,12 @@ mod tests {
|
||||
type Error = ();
|
||||
type Future = Ready<Result<Self::Response, Self::Error>>;
|
||||
|
||||
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.0.set(self.0.get() + 1);
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Result<&'static str, &'static str>) -> Self::Future {
|
||||
fn call(&self, req: Result<&'static str, &'static str>) -> Self::Future {
|
||||
match req {
|
||||
Ok(msg) => ok(msg),
|
||||
Err(_) => err(()),
|
||||
@ -286,12 +276,12 @@ mod tests {
|
||||
type Error = ();
|
||||
type Future = Ready<Result<Self::Response, ()>>;
|
||||
|
||||
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.0.set(self.0.get() + 1);
|
||||
Poll::Ready(Err(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Result<&'static str, ()>) -> Self::Future {
|
||||
fn call(&self, req: Result<&'static str, ()>) -> Self::Future {
|
||||
match req {
|
||||
Ok(msg) => ok((msg, "ok")),
|
||||
Err(()) => ok(("srv2", "err")),
|
||||
@ -302,7 +292,7 @@ mod tests {
|
||||
#[actix_rt::test]
|
||||
async fn test_poll_ready() {
|
||||
let cnt = Rc::new(Cell::new(0));
|
||||
let mut srv = pipeline(Srv1(cnt.clone())).then(Srv2(cnt.clone()));
|
||||
let srv = pipeline(Srv1(cnt.clone())).then(Srv2(cnt.clone()));
|
||||
let res = lazy(|cx| srv.poll_ready(cx)).await;
|
||||
assert_eq!(res, Poll::Ready(Err(())));
|
||||
assert_eq!(cnt.get(), 2);
|
||||
@ -311,7 +301,7 @@ mod tests {
|
||||
#[actix_rt::test]
|
||||
async fn test_call() {
|
||||
let cnt = Rc::new(Cell::new(0));
|
||||
let mut srv = pipeline(Srv1(cnt.clone())).then(Srv2(cnt));
|
||||
let srv = pipeline(Srv1(cnt.clone())).then(Srv2(cnt));
|
||||
|
||||
let res = srv.call(Ok("srv1")).await;
|
||||
assert!(res.is_ok());
|
||||
@ -328,7 +318,7 @@ mod tests {
|
||||
let cnt2 = cnt.clone();
|
||||
let blank = move || ready(Ok::<_, ()>(Srv1(cnt2.clone())));
|
||||
let factory = pipeline_factory(blank).then(move || ready(Ok(Srv2(cnt.clone()))));
|
||||
let mut srv = factory.new_service(&()).await.unwrap();
|
||||
let srv = factory.new_service(&()).await.unwrap();
|
||||
let res = srv.call(Ok("srv1")).await;
|
||||
assert!(res.is_ok());
|
||||
assert_eq!(res.unwrap(), ("srv1", "ok"));
|
||||
|
@ -1,4 +1,3 @@
|
||||
use alloc::{rc::Rc, sync::Arc};
|
||||
use core::{
|
||||
future::Future,
|
||||
marker::PhantomData,
|
||||
@ -6,6 +5,8 @@ use core::{
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use alloc::{rc::Rc, sync::Arc};
|
||||
use futures_core::ready;
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
use crate::transform_err::TransformMapInitErr;
|
||||
@ -47,7 +48,7 @@ where
|
||||
///
|
||||
/// actix_service::forward_ready!(service);
|
||||
///
|
||||
/// fn call(&mut self, req: S::Request) -> Self::Future {
|
||||
/// fn call(&self, req: S::Request) -> Self::Future {
|
||||
/// TimeoutServiceResponse {
|
||||
/// fut: self.service.call(req),
|
||||
/// sleep: Delay::new(clock::now() + self.timeout),
|
||||
@ -127,8 +128,8 @@ where
|
||||
{
|
||||
type Response = T::Response;
|
||||
type Error = T::Error;
|
||||
type InitError = T::InitError;
|
||||
type Transform = T::Transform;
|
||||
type InitError = T::InitError;
|
||||
type Future = T::Future;
|
||||
|
||||
fn new_transform(&self, service: S) -> T::Future {
|
||||
@ -142,8 +143,8 @@ where
|
||||
{
|
||||
type Response = T::Response;
|
||||
type Error = T::Error;
|
||||
type InitError = T::InitError;
|
||||
type Transform = T::Transform;
|
||||
type InitError = T::InitError;
|
||||
type Future = T::Future;
|
||||
|
||||
fn new_transform(&self, service: S) -> T::Future {
|
||||
@ -229,14 +230,12 @@ where
|
||||
let mut this = self.as_mut().project();
|
||||
|
||||
match this.state.as_mut().project() {
|
||||
ApplyTransformFutureStateProj::A { fut } => match fut.poll(cx)? {
|
||||
Poll::Ready(srv) => {
|
||||
let fut = this.store.0.new_transform(srv);
|
||||
this.state.set(ApplyTransformFutureState::B { fut });
|
||||
self.poll(cx)
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
},
|
||||
ApplyTransformFutureStateProj::A { fut } => {
|
||||
let srv = ready!(fut.poll(cx))?;
|
||||
let fut = this.store.0.new_transform(srv);
|
||||
this.state.set(ApplyTransformFutureState::B { fut });
|
||||
self.poll(cx)
|
||||
}
|
||||
ApplyTransformFutureStateProj::B { fut } => fut.poll(cx),
|
||||
}
|
||||
}
|
||||
|
@ -79,7 +79,7 @@ where
|
||||
type Error = Error;
|
||||
type Future = LocalBoxFuture<'static, Result<TlsStream<T>, Error>>;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
if self.conns.available(cx) {
|
||||
Poll::Ready(Ok(()))
|
||||
} else {
|
||||
@ -87,7 +87,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn call(&mut self, io: T) -> Self::Future {
|
||||
fn call(&self, io: T) -> Self::Future {
|
||||
let guard = self.conns.get();
|
||||
let this = self.clone();
|
||||
Box::pin(async move {
|
||||
|
@ -75,7 +75,7 @@ where
|
||||
type Error = SslError;
|
||||
type Future = AcceptorServiceResponse<T>;
|
||||
|
||||
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
if self.conns.available(ctx) {
|
||||
Poll::Ready(Ok(()))
|
||||
} else {
|
||||
@ -83,7 +83,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn call(&mut self, io: T) -> Self::Future {
|
||||
fn call(&self, io: T) -> Self::Future {
|
||||
let ssl_ctx = self.acceptor.context();
|
||||
let ssl = Ssl::new(ssl_ctx).expect("Provided SSL acceptor was invalid.");
|
||||
AcceptorServiceResponse {
|
||||
|
@ -80,7 +80,7 @@ where
|
||||
type Error = io::Error;
|
||||
type Future = AcceptorServiceFut<T>;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
if self.conns.available(cx) {
|
||||
Poll::Ready(Ok(()))
|
||||
} else {
|
||||
@ -88,7 +88,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn call(&mut self, req: T) -> Self::Future {
|
||||
fn call(&self, req: T) -> Self::Future {
|
||||
AcceptorServiceFut {
|
||||
_guard: self.conns.get(),
|
||||
fut: self.acceptor.accept(req),
|
||||
|
@ -51,7 +51,7 @@ impl<T: Address> Service<Connect<T>> for TcpConnector {
|
||||
|
||||
actix_service::always_ready!();
|
||||
|
||||
fn call(&mut self, req: Connect<T>) -> Self::Future {
|
||||
fn call(&self, req: Connect<T>) -> Self::Future {
|
||||
let port = req.port();
|
||||
let Connect { req, addr, .. } = req;
|
||||
|
||||
|
@ -146,7 +146,7 @@ impl<T: Address> Service<Connect<T>> for Resolver {
|
||||
|
||||
actix_service::always_ready!();
|
||||
|
||||
fn call(&mut self, req: Connect<T>) -> Self::Future {
|
||||
fn call(&self, req: Connect<T>) -> Self::Future {
|
||||
if !req.addr.is_none() {
|
||||
ResolverFuture::Connected(Some(req))
|
||||
} else if let Ok(ip) = req.host().parse() {
|
||||
|
@ -80,7 +80,7 @@ impl<T: Address> Service<Connect<T>> for ConnectService {
|
||||
|
||||
actix_service::always_ready!();
|
||||
|
||||
fn call(&mut self, req: Connect<T>) -> Self::Future {
|
||||
fn call(&self, req: Connect<T>) -> Self::Future {
|
||||
ConnectServiceResponse {
|
||||
fut: ConnectFuture::Resolve(self.resolver.call(req)),
|
||||
tcp: self.tcp,
|
||||
@ -149,7 +149,7 @@ impl<T: Address> Service<Connect<T>> for TcpConnectService {
|
||||
|
||||
actix_service::always_ready!();
|
||||
|
||||
fn call(&mut self, req: Connect<T>) -> Self::Future {
|
||||
fn call(&self, req: Connect<T>) -> Self::Future {
|
||||
TcpConnectServiceResponse {
|
||||
fut: ConnectFuture::Resolve(self.resolver.call(req)),
|
||||
tcp: self.tcp,
|
||||
|
@ -84,7 +84,7 @@ where
|
||||
|
||||
actix_service::always_ready!();
|
||||
|
||||
fn call(&mut self, stream: Connection<T, U>) -> Self::Future {
|
||||
fn call(&self, stream: Connection<T, U>) -> Self::Future {
|
||||
trace!("SSL Handshake start for: {:?}", stream.host());
|
||||
let (io, stream) = stream.replace(());
|
||||
let host = stream.host();
|
||||
@ -202,7 +202,7 @@ impl<T: Address + 'static> Service<Connect<T>> for OpensslConnectService {
|
||||
|
||||
actix_service::always_ready!();
|
||||
|
||||
fn call(&mut self, req: Connect<T>) -> Self::Future {
|
||||
fn call(&self, req: Connect<T>) -> Self::Future {
|
||||
OpensslConnectServiceResponse {
|
||||
fut1: Some(self.tcp.call(req)),
|
||||
fut2: None,
|
||||
|
@ -84,7 +84,7 @@ where
|
||||
|
||||
actix_service::always_ready!();
|
||||
|
||||
fn call(&mut self, stream: Connection<T, U>) -> Self::Future {
|
||||
fn call(&self, stream: Connection<T, U>) -> Self::Future {
|
||||
trace!("SSL Handshake start for: {:?}", stream.host());
|
||||
let (io, stream) = stream.replace(());
|
||||
let host = DNSNameRef::try_from_ascii_str(stream.host())
|
||||
|
@ -22,7 +22,7 @@ async fn test_string() {
|
||||
})
|
||||
});
|
||||
|
||||
let mut conn = actix_connect::default_connector();
|
||||
let conn = actix_connect::default_connector();
|
||||
let addr = format!("localhost:{}", srv.port());
|
||||
let con = conn.call(addr.into()).await.unwrap();
|
||||
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
||||
@ -39,7 +39,7 @@ async fn test_rustls_string() {
|
||||
})
|
||||
});
|
||||
|
||||
let mut conn = actix_connect::default_connector();
|
||||
let conn = actix_connect::default_connector();
|
||||
let addr = format!("localhost:{}", srv.port());
|
||||
let con = conn.call(addr.into()).await.unwrap();
|
||||
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
||||
@ -55,13 +55,14 @@ async fn test_static_str() {
|
||||
})
|
||||
});
|
||||
|
||||
let mut conn = actix_connect::default_connector();
|
||||
let conn = actix_connect::default_connector();
|
||||
|
||||
let con = conn.call(Connect::with("10", srv.addr())).await.unwrap();
|
||||
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
||||
|
||||
let connect = Connect::new(srv.host().to_owned());
|
||||
let mut conn = actix_connect::default_connector();
|
||||
|
||||
let conn = actix_connect::default_connector();
|
||||
let con = conn.call(connect).await;
|
||||
assert!(con.is_err());
|
||||
}
|
||||
@ -78,7 +79,7 @@ async fn test_new_service() {
|
||||
|
||||
let factory = actix_connect::default_connector_factory();
|
||||
|
||||
let mut conn = factory.new_service(()).await.unwrap();
|
||||
let conn = factory.new_service(()).await.unwrap();
|
||||
let con = conn.call(Connect::with("10", srv.addr())).await.unwrap();
|
||||
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
||||
}
|
||||
@ -126,7 +127,7 @@ async fn test_custom_resolver() {
|
||||
|
||||
let factory = actix_connect::new_connector_factory(resolver);
|
||||
|
||||
let mut conn = factory.new_service(()).await.unwrap();
|
||||
let conn = factory.new_service(()).await.unwrap();
|
||||
let con = conn.call(Connect::with("10", srv.addr())).await.unwrap();
|
||||
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
||||
}
|
||||
@ -144,7 +145,7 @@ async fn test_openssl_uri() {
|
||||
})
|
||||
});
|
||||
|
||||
let mut conn = actix_connect::default_connector();
|
||||
let conn = actix_connect::default_connector();
|
||||
let addr = http::Uri::try_from(format!("https://localhost:{}", srv.port())).unwrap();
|
||||
let con = conn.call(addr.into()).await.unwrap();
|
||||
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
||||
@ -163,7 +164,7 @@ async fn test_rustls_uri() {
|
||||
})
|
||||
});
|
||||
|
||||
let mut conn = actix_connect::default_connector();
|
||||
let conn = actix_connect::default_connector();
|
||||
let addr = http::Uri::try_from(format!("https://localhost:{}", srv.port())).unwrap();
|
||||
let con = conn.call(addr.into()).await.unwrap();
|
||||
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
||||
|
@ -37,7 +37,7 @@ where
|
||||
|
||||
actix_service::forward_ready!(inner);
|
||||
|
||||
fn call(&mut self, req: Req) -> Self::Future {
|
||||
fn call(&self, req: Req) -> Self::Future {
|
||||
let span = (self.make_span)(&req);
|
||||
let _enter = span.as_ref().map(|s| s.enter());
|
||||
|
||||
@ -229,7 +229,7 @@ mod test {
|
||||
|
||||
let span_svc = span!(Level::TRACE, "span_svc");
|
||||
let trace_service_factory = trace(service_factory, |_: &&str| Some(span_svc.clone()));
|
||||
let mut service = trace_service_factory.new_service(()).await.unwrap();
|
||||
let service = trace_service_factory.new_service(()).await.unwrap();
|
||||
service.call("boo").await.unwrap();
|
||||
|
||||
let id = span_svc.id().unwrap().into_u64();
|
||||
|
@ -151,7 +151,7 @@ where
|
||||
|
||||
actix_service::forward_ready!(service);
|
||||
|
||||
fn call(&mut self, request: Req) -> Self::Future {
|
||||
fn call(&self, request: Req) -> Self::Future {
|
||||
TimeoutServiceResponse {
|
||||
fut: self.service.call(request),
|
||||
sleep: sleep(self.timeout),
|
||||
@ -213,7 +213,7 @@ mod tests {
|
||||
|
||||
actix_service::always_ready!();
|
||||
|
||||
fn call(&mut self, _: ()) -> Self::Future {
|
||||
fn call(&self, _: ()) -> Self::Future {
|
||||
let sleep = actix_rt::time::sleep(self.0);
|
||||
Box::pin(async move {
|
||||
sleep.await;
|
||||
@ -227,7 +227,7 @@ mod tests {
|
||||
let resolution = Duration::from_millis(100);
|
||||
let wait_time = Duration::from_millis(50);
|
||||
|
||||
let mut timeout = TimeoutService::new(resolution, SleepService(wait_time));
|
||||
let timeout = TimeoutService::new(resolution, SleepService(wait_time));
|
||||
assert_eq!(timeout.call(()).await, Ok(()));
|
||||
}
|
||||
|
||||
@ -236,7 +236,7 @@ mod tests {
|
||||
let resolution = Duration::from_millis(100);
|
||||
let wait_time = Duration::from_millis(500);
|
||||
|
||||
let mut timeout = TimeoutService::new(resolution, SleepService(wait_time));
|
||||
let timeout = TimeoutService::new(resolution, SleepService(wait_time));
|
||||
assert_eq!(timeout.call(()).await, Err(TimeoutError::Timeout));
|
||||
}
|
||||
|
||||
@ -249,7 +249,7 @@ mod tests {
|
||||
Timeout::new(resolution),
|
||||
fn_factory(|| async { Ok::<_, ()>(SleepService(wait_time)) }),
|
||||
);
|
||||
let mut srv = timeout.new_service(&()).await.unwrap();
|
||||
let srv = timeout.new_service(&()).await.unwrap();
|
||||
|
||||
assert_eq!(srv.call(()).await, Err(TimeoutError::Timeout));
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user