1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-06-26 22:07:42 +02:00

move service to separate crate

This commit is contained in:
Nikolay Kim
2018-12-09 09:56:23 -08:00
parent e8aa73a44b
commit 5f37d85f9b
14 changed files with 183 additions and 74 deletions

26
actix-service/Cargo.toml Normal file
View File

@ -0,0 +1,26 @@
[package]
name = "actix-service"
version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix Service"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-service/"
categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0"
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
edition = "2018"
workspace = "../"
[badges]
travis-ci = { repository = "actix/actix-service", branch = "master" }
# appveyor = { repository = "fafhrd91/actix-web-hdy9d" }
codecov = { repository = "actix/actix-service", branch = "master", service = "github" }
[lib]
name = "actix_service"
path = "src/lib.rs"
[dependencies]
futures = "0.1.24"

View File

@ -0,0 +1,292 @@
use futures::{try_ready, Async, Future, Poll};
use super::{IntoNewService, NewService, Service};
use crate::cell::Cell;
/// Service for the `and_then` combinator, chaining a computation onto the end
/// of another service which completes successfully.
///
/// This is created by the `ServiceExt::and_then` method.
pub struct AndThen<A, B> {
a: A,
b: Cell<B>,
}
impl<A, B> AndThen<A, B> {
/// Create new `AndThen` combinator
pub fn new<Request>(a: A, b: B) -> Self
where
A: Service<Request>,
B: Service<A::Response, Error = A::Error>,
{
Self { a, b: Cell::new(b) }
}
}
impl<A, B> Clone for AndThen<A, B>
where
A: Clone,
{
fn clone(&self) -> Self {
AndThen {
a: self.a.clone(),
b: self.b.clone(),
}
}
}
impl<A, B, Request> Service<Request> for AndThen<A, B>
where
A: Service<Request>,
B: Service<A::Response, Error = A::Error>,
{
type Response = B::Response;
type Error = A::Error;
type Future = AndThenFuture<A, B, Request>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
try_ready!(self.a.poll_ready());
self.b.get_mut().poll_ready()
}
fn call(&mut self, req: Request) -> Self::Future {
AndThenFuture::new(self.a.call(req), self.b.clone())
}
}
pub struct AndThenFuture<A, B, Request>
where
A: Service<Request>,
B: Service<A::Response, Error = A::Error>,
{
b: Cell<B>,
fut_b: Option<B::Future>,
fut_a: A::Future,
}
impl<A, B, Request> AndThenFuture<A, B, Request>
where
A: Service<Request>,
B: Service<A::Response, Error = A::Error>,
{
fn new(fut_a: A::Future, b: Cell<B>) -> Self {
AndThenFuture {
b,
fut_a,
fut_b: None,
}
}
}
impl<A, B, Request> Future for AndThenFuture<A, B, Request>
where
A: Service<Request>,
B: Service<A::Response, Error = A::Error>,
{
type Item = B::Response;
type Error = A::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.fut_b {
return fut.poll();
}
match self.fut_a.poll() {
Ok(Async::Ready(resp)) => {
self.fut_b = Some(self.b.get_mut().call(resp));
self.poll()
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Err(err),
}
}
}
/// `AndThenNewService` new service combinator
pub struct AndThenNewService<A, B> {
a: A,
b: B,
}
impl<A, B> AndThenNewService<A, B> {
/// Create new `AndThen` combinator
pub fn new<Request, F: IntoNewService<B, A::Response>>(a: A, f: F) -> Self
where
A: NewService<Request>,
B: NewService<A::Response, Error = A::Error, InitError = A::InitError>,
{
Self {
a,
b: f.into_new_service(),
}
}
}
impl<A, B, Request> NewService<Request> for AndThenNewService<A, B>
where
A: NewService<Request>,
B: NewService<A::Response, Error = A::Error, InitError = A::InitError>,
{
type Response = B::Response;
type Error = A::Error;
type Service = AndThen<A::Service, B::Service>;
type InitError = A::InitError;
type Future = AndThenNewServiceFuture<A, B, Request>;
fn new_service(&self) -> Self::Future {
AndThenNewServiceFuture::new(self.a.new_service(), self.b.new_service())
}
}
impl<A, B> Clone for AndThenNewService<A, B>
where
A: Clone,
B: Clone,
{
fn clone(&self) -> Self {
Self {
a: self.a.clone(),
b: self.b.clone(),
}
}
}
pub struct AndThenNewServiceFuture<A, B, Request>
where
A: NewService<Request>,
B: NewService<A::Response>,
{
fut_b: B::Future,
fut_a: A::Future,
a: Option<A::Service>,
b: Option<B::Service>,
}
impl<A, B, Request> AndThenNewServiceFuture<A, B, Request>
where
A: NewService<Request>,
B: NewService<A::Response>,
{
fn new(fut_a: A::Future, fut_b: B::Future) -> Self {
AndThenNewServiceFuture {
fut_a,
fut_b,
a: None,
b: None,
}
}
}
impl<A, B, Request> Future for AndThenNewServiceFuture<A, B, Request>
where
A: NewService<Request>,
B: NewService<A::Response, Error = A::Error, InitError = A::InitError>,
{
type Item = AndThen<A::Service, B::Service>;
type Error = A::InitError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.a.is_none() {
if let Async::Ready(service) = self.fut_a.poll()? {
self.a = Some(service);
}
}
if self.b.is_none() {
if let Async::Ready(service) = self.fut_b.poll()? {
self.b = Some(service);
}
}
if self.a.is_some() && self.b.is_some() {
Ok(Async::Ready(AndThen::new(
self.a.take().unwrap(),
self.b.take().unwrap(),
)))
} else {
Ok(Async::NotReady)
}
}
}
#[cfg(test)]
mod tests {
use futures::future::{ok, FutureResult};
use futures::{Async, Poll};
use std::cell::Cell;
use std::rc::Rc;
use super::*;
use crate::{NewService, Service};
struct Srv1(Rc<Cell<usize>>);
impl Service<&'static str> for Srv1 {
type Response = &'static str;
type Error = ();
type Future = FutureResult<Self::Response, ()>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.0.set(self.0.get() + 1);
Ok(Async::Ready(()))
}
fn call(&mut self, req: &'static str) -> Self::Future {
ok(req)
}
}
#[derive(Clone)]
struct Srv2(Rc<Cell<usize>>);
impl Service<&'static str> for Srv2 {
type Response = (&'static str, &'static str);
type Error = ();
type Future = FutureResult<Self::Response, ()>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.0.set(self.0.get() + 1);
Ok(Async::Ready(()))
}
fn call(&mut self, req: &'static str) -> Self::Future {
ok((req, "srv2"))
}
}
#[test]
fn test_poll_ready() {
let cnt = Rc::new(Cell::new(0));
let mut srv = Srv1(cnt.clone()).and_then(Srv2(cnt.clone()));
let res = srv.poll_ready();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(()));
assert_eq!(cnt.get(), 2);
}
#[test]
fn test_call() {
let cnt = Rc::new(Cell::new(0));
let mut srv = Srv1(cnt.clone()).and_then(Srv2(cnt));
let res = srv.call("srv1").poll();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv1", "srv2")));
}
#[test]
fn test_new_service() {
let cnt = Rc::new(Cell::new(0));
let cnt2 = cnt.clone();
let blank = move || Ok::<_, ()>(Srv1(cnt2.clone()));
let new_srv = blank
.into_new_service()
.and_then(move || Ok(Srv2(cnt.clone())));
if let Async::Ready(mut srv) = new_srv.new_service().poll().unwrap() {
let res = srv.call("srv1").poll();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv1", "srv2")));
} else {
panic!()
}
}
}

222
actix-service/src/apply.rs Normal file
View File

@ -0,0 +1,222 @@
use std::marker::PhantomData;
use futures::{Async, Future, IntoFuture, Poll};
use super::{IntoNewService, IntoService, NewService, Service};
/// `Apply` service combinator
pub struct Apply<T, F, In, Out, Request>
where
T: Service<Request>,
{
service: T,
f: F,
r: PhantomData<(In, Out, Request)>,
}
impl<T, F, In, Out, Request> Apply<T, F, In, Out, Request>
where
T: Service<Request>,
F: Fn(In, &mut T) -> Out,
Out: IntoFuture,
{
/// Create new `Apply` combinator
pub fn new<I: IntoService<T, Request>>(service: I, f: F) -> Self {
Self {
service: service.into_service(),
f,
r: PhantomData,
}
}
}
impl<T, F, In, Out, Request> Clone for Apply<T, F, In, Out, Request>
where
T: Service<Request> + Clone,
F: Clone,
{
fn clone(&self) -> Self {
Apply {
service: self.service.clone(),
f: self.f.clone(),
r: PhantomData,
}
}
}
impl<T, F, In, Out, Request> Service<In> for Apply<T, F, In, Out, Request>
where
T: Service<Request, Error = Out::Error>,
F: Fn(In, &mut T) -> Out,
Out: IntoFuture,
{
type Response = Out::Item;
type Error = Out::Error;
type Future = Out::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready()
}
fn call(&mut self, req: In) -> Self::Future {
(self.f)(req, &mut self.service).into_future()
}
}
/// `ApplyNewService` new service combinator
pub struct ApplyNewService<T, F, In, Out, Request>
where
T: NewService<Request>,
{
service: T,
f: F,
r: PhantomData<(In, Out, Request)>,
}
impl<T, F, In, Out, Request> ApplyNewService<T, F, In, Out, Request>
where
T: NewService<Request>,
F: Fn(In, &mut T::Service) -> Out,
Out: IntoFuture,
{
/// Create new `ApplyNewService` new service instance
pub fn new<F1: IntoNewService<T, Request>>(service: F1, f: F) -> Self {
Self {
f,
service: service.into_new_service(),
r: PhantomData,
}
}
}
impl<T, F, In, Out, Request> Clone for ApplyNewService<T, F, In, Out, Request>
where
T: NewService<Request> + Clone,
F: Fn(Out, &mut T::Service) -> Out + Clone,
Out: IntoFuture,
{
fn clone(&self) -> Self {
Self {
service: self.service.clone(),
f: self.f.clone(),
r: PhantomData,
}
}
}
impl<T, F, In, Out, Request> NewService<In> for ApplyNewService<T, F, In, Out, Request>
where
T: NewService<Request, Error = Out::Error>,
F: Fn(In, &mut T::Service) -> Out + Clone,
Out: IntoFuture,
{
type Response = Out::Item;
type Error = Out::Error;
type Service = Apply<T::Service, F, In, Out, Request>;
type InitError = T::InitError;
type Future = ApplyNewServiceFuture<T, F, In, Out, Request>;
fn new_service(&self) -> Self::Future {
ApplyNewServiceFuture::new(self.service.new_service(), self.f.clone())
}
}
pub struct ApplyNewServiceFuture<T, F, In, Out, Request>
where
T: NewService<Request>,
F: Fn(In, &mut T::Service) -> Out,
Out: IntoFuture,
{
fut: T::Future,
f: Option<F>,
r: PhantomData<(In, Out)>,
}
impl<T, F, In, Out, Request> ApplyNewServiceFuture<T, F, In, Out, Request>
where
T: NewService<Request>,
F: Fn(In, &mut T::Service) -> Out,
Out: IntoFuture,
{
fn new(fut: T::Future, f: F) -> Self {
ApplyNewServiceFuture {
f: Some(f),
fut,
r: PhantomData,
}
}
}
impl<T, F, In, Out, Request> Future for ApplyNewServiceFuture<T, F, In, Out, Request>
where
T: NewService<Request>,
F: Fn(In, &mut T::Service) -> Out,
Out: IntoFuture,
{
type Item = Apply<T::Service, F, In, Out, Request>;
type Error = T::InitError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Async::Ready(service) = self.fut.poll()? {
Ok(Async::Ready(Apply::new(service, self.f.take().unwrap())))
} else {
Ok(Async::NotReady)
}
}
}
#[cfg(test)]
mod tests {
use futures::future::{ok, FutureResult};
use futures::{Async, Future, Poll};
use crate::{IntoNewService, IntoService, NewService, Service};
#[derive(Clone)]
struct Srv;
impl Service<()> for Srv {
type Response = ();
type Error = ();
type Future = FutureResult<(), ()>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, _: ()) -> Self::Future {
ok(())
}
}
#[test]
fn test_call() {
let blank = |req| Ok(req);
let mut srv = blank.into_service().apply(Srv, |req: &'static str, srv| {
srv.call(()).map(move |res| (req, res))
});
assert!(srv.poll_ready().is_ok());
let res = srv.call("srv").poll();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv", ())));
}
#[test]
fn test_new_service() {
let blank = || Ok::<_, ()>((|req| Ok(req)).into_service());
let new_srv = blank.into_new_service().apply(
|| Ok(Srv),
|req: &'static str, srv| srv.call(()).map(move |res| (req, res)),
);
if let Async::Ready(mut srv) = new_srv.new_service().poll().unwrap() {
assert!(srv.poll_ready().is_ok());
let res = srv.call("srv").poll();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv", ())));
} else {
panic!()
}
}
}

32
actix-service/src/cell.rs Normal file
View File

@ -0,0 +1,32 @@
//! Custom cell impl
use std::{cell::UnsafeCell, fmt, rc::Rc};
pub(crate) struct Cell<T> {
inner: Rc<UnsafeCell<T>>,
}
impl<T> Clone for Cell<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T: fmt::Debug> fmt::Debug for Cell<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.inner.fmt(f)
}
}
impl<T> Cell<T> {
pub(crate) fn new(inner: T) -> Self {
Self {
inner: Rc::new(UnsafeCell::new(inner)),
}
}
pub(crate) fn get_mut(&mut self) -> &mut T {
unsafe { &mut *self.inner.as_ref().get() }
}
}

View File

@ -0,0 +1,129 @@
use std::marker;
use futures::{
future::{ok, FutureResult},
Async, IntoFuture, Poll,
};
use super::{IntoNewService, IntoService, NewService, Service};
pub struct FnService<F, Req, Resp, E, Fut>
where
F: Fn(Req) -> Fut,
Fut: IntoFuture<Item = Resp, Error = E>,
{
f: F,
_t: marker::PhantomData<(Req, Resp, E)>,
}
impl<F, Req, Resp, E, Fut> FnService<F, Req, Resp, E, Fut>
where
F: Fn(Req) -> Fut,
Fut: IntoFuture<Item = Resp, Error = E>,
{
pub fn new(f: F) -> Self {
FnService {
f,
_t: marker::PhantomData,
}
}
}
impl<F, Req, Resp, E, Fut> Clone for FnService<F, Req, Resp, E, Fut>
where
F: Fn(Req) -> Fut + Clone,
Fut: IntoFuture<Item = Resp, Error = E>,
{
fn clone(&self) -> Self {
FnService {
f: self.f.clone(),
_t: marker::PhantomData,
}
}
}
impl<F, Req, Resp, E, Fut> Service<Req> for FnService<F, Req, Resp, E, Fut>
where
F: Fn(Req) -> Fut,
Fut: IntoFuture<Item = Resp, Error = E>,
{
type Response = Resp;
type Error = E;
type Future = Fut::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: Req) -> Self::Future {
(self.f)(req).into_future()
}
}
impl<F, Req, Resp, Err, Fut> IntoService<FnService<F, Req, Resp, Err, Fut>, Req> for F
where
F: Fn(Req) -> Fut + 'static,
Fut: IntoFuture<Item = Resp, Error = Err>,
{
fn into_service(self) -> FnService<F, Req, Resp, Err, Fut> {
FnService::new(self)
}
}
pub struct FnNewService<F, Req, Resp, Err, Fut>
where
F: Fn(Req) -> Fut,
Fut: IntoFuture<Item = Resp, Error = Err>,
{
f: F,
_t: marker::PhantomData<(Req, Resp, Err)>,
}
impl<F, Req, Resp, Err, Fut> FnNewService<F, Req, Resp, Err, Fut>
where
F: Fn(Req) -> Fut + Clone,
Fut: IntoFuture<Item = Resp, Error = Err>,
{
pub fn new(f: F) -> Self {
FnNewService {
f,
_t: marker::PhantomData,
}
}
}
impl<F, Req, Resp, Err, Fut> NewService<Req> for FnNewService<F, Req, Resp, Err, Fut>
where
F: Fn(Req) -> Fut + Clone,
Fut: IntoFuture<Item = Resp, Error = Err>,
{
type Response = Resp;
type Error = Err;
type Service = FnService<F, Req, Resp, Err, Fut>;
type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self) -> Self::Future {
ok(FnService::new(self.f.clone()))
}
}
impl<F, Req, Resp, Err, Fut> IntoNewService<FnNewService<F, Req, Resp, Err, Fut>, Req> for F
where
F: Fn(Req) -> Fut + Clone + 'static,
Fut: IntoFuture<Item = Resp, Error = Err>,
{
fn into_new_service(self) -> FnNewService<F, Req, Resp, Err, Fut> {
FnNewService::new(self)
}
}
impl<F, Req, Resp, Err, Fut> Clone for FnNewService<F, Req, Resp, Err, Fut>
where
F: Fn(Req) -> Fut + Clone,
Fut: IntoFuture<Item = Resp, Error = Err>,
{
fn clone(&self) -> Self {
Self::new(self.f.clone())
}
}

View File

@ -0,0 +1,216 @@
use std::marker::PhantomData;
use futures::{Async, Future, Poll};
use super::{NewService, Service};
/// Service for the `from_err` combinator, changing the error type of a service.
///
/// This is created by the `ServiceExt::from_err` method.
pub struct FromErr<A, E> {
service: A,
f: PhantomData<E>,
}
impl<A, E> FromErr<A, E> {
pub(crate) fn new<Request>(service: A) -> Self
where
A: Service<Request>,
E: From<A::Error>,
{
FromErr {
service,
f: PhantomData,
}
}
}
impl<A, E> Clone for FromErr<A, E>
where
A: Clone,
{
fn clone(&self) -> Self {
FromErr {
service: self.service.clone(),
f: PhantomData,
}
}
}
impl<A, E, Request> Service<Request> for FromErr<A, E>
where
A: Service<Request>,
E: From<A::Error>,
{
type Response = A::Response;
type Error = E;
type Future = FromErrFuture<A, E, Request>;
fn poll_ready(&mut self) -> Poll<(), E> {
Ok(self.service.poll_ready().map_err(E::from)?)
}
fn call(&mut self, req: Request) -> Self::Future {
FromErrFuture {
fut: self.service.call(req),
f: PhantomData,
}
}
}
pub struct FromErrFuture<A: Service<Request>, E, Request> {
fut: A::Future,
f: PhantomData<E>,
}
impl<A, E, Request> Future for FromErrFuture<A, E, Request>
where
A: Service<Request>,
E: From<A::Error>,
{
type Item = A::Response;
type Error = E;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.fut.poll().map_err(E::from)
}
}
/// NewService for the `from_err` combinator, changing the type of a new
/// service's error.
///
/// This is created by the `NewServiceExt::from_err` method.
pub struct FromErrNewService<A, E> {
a: A,
e: PhantomData<E>,
}
impl<A, E> FromErrNewService<A, E> {
/// Create new `FromErr` new service instance
pub fn new<Request>(a: A) -> Self
where
A: NewService<Request>,
E: From<A::Error>,
{
Self { a, e: PhantomData }
}
}
impl<A, E> Clone for FromErrNewService<A, E>
where
A: Clone,
{
fn clone(&self) -> Self {
Self {
a: self.a.clone(),
e: PhantomData,
}
}
}
impl<A, E, Request> NewService<Request> for FromErrNewService<A, E>
where
A: NewService<Request>,
E: From<A::Error>,
{
type Response = A::Response;
type Error = E;
type Service = FromErr<A::Service, E>;
type InitError = A::InitError;
type Future = FromErrNewServiceFuture<A, E, Request>;
fn new_service(&self) -> Self::Future {
FromErrNewServiceFuture {
fut: self.a.new_service(),
e: PhantomData,
}
}
}
pub struct FromErrNewServiceFuture<A, E, Request>
where
A: NewService<Request>,
E: From<A::Error>,
{
fut: A::Future,
e: PhantomData<E>,
}
impl<A, E, Request> Future for FromErrNewServiceFuture<A, E, Request>
where
A: NewService<Request>,
E: From<A::Error>,
{
type Item = FromErr<A::Service, E>;
type Error = A::InitError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Async::Ready(service) = self.fut.poll()? {
Ok(Async::Ready(FromErr::new(service)))
} else {
Ok(Async::NotReady)
}
}
}
#[cfg(test)]
mod tests {
use futures::future::{err, FutureResult};
use super::*;
use crate::{IntoNewService, NewService, Service};
struct Srv;
impl Service<()> for Srv {
type Response = ();
type Error = ();
type Future = FutureResult<(), ()>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Err(())
}
fn call(&mut self, _: ()) -> Self::Future {
err(())
}
}
#[derive(Debug, PartialEq)]
struct Error;
impl From<()> for Error {
fn from(_: ()) -> Self {
Error
}
}
#[test]
fn test_poll_ready() {
let mut srv = Srv.from_err::<Error>();
let res = srv.poll_ready();
assert!(res.is_err());
assert_eq!(res.err().unwrap(), Error);
}
#[test]
fn test_call() {
let mut srv = Srv.from_err::<Error>();
let res = srv.call(()).poll();
assert!(res.is_err());
assert_eq!(res.err().unwrap(), Error);
}
#[test]
fn test_new_service() {
let blank = || Ok::<_, ()>(Srv);
let new_srv = blank.into_new_service().from_err::<Error>();
if let Async::Ready(mut srv) = new_srv.new_service().poll().unwrap() {
let res = srv.call(()).poll();
assert!(res.is_err());
assert_eq!(res.err().unwrap(), Error);
} else {
panic!()
}
}
}

334
actix-service/src/lib.rs Normal file
View File

@ -0,0 +1,334 @@
use futures::{Future, IntoFuture, Poll};
mod and_then;
mod apply;
mod cell;
mod fn_service;
mod from_err;
mod map;
mod map_err;
mod map_init_err;
mod then;
pub use self::and_then::{AndThen, AndThenNewService};
pub use self::apply::{Apply, ApplyNewService};
pub use self::fn_service::{FnNewService, FnService};
pub use self::from_err::{FromErr, FromErrNewService};
pub use self::map::{Map, MapNewService};
pub use self::map_err::{MapErr, MapErrNewService};
pub use self::map_init_err::MapInitErr;
pub use self::then::{Then, ThenNewService};
/// An asynchronous function from `Request` to a `Response`.
pub trait Service<Request> {
/// Responses given by the service.
type Response;
/// Errors produced by the service.
type Error;
/// The future response value.
type Future: Future<Item = Self::Response, Error = Self::Error>;
/// Returns `Ready` when the service is able to process requests.
///
/// If the service is at capacity, then `NotReady` is returned and the task
/// is notified when the service becomes ready again. This function is
/// expected to be called while on a task.
///
/// This is a **best effort** implementation. False positives are permitted.
/// It is permitted for the service to return `Ready` from a `poll_ready`
/// call and the next invocation of `call` results in an error.
fn poll_ready(&mut self) -> Poll<(), Self::Error>;
/// Process the request and return the response asynchronously.
///
/// This function is expected to be callable off task. As such,
/// implementations should take care to not call `poll_ready`. If the
/// service is at capacity and the request is unable to be handled, the
/// returned `Future` should resolve to an error.
///
/// Calling `call` without calling `poll_ready` is permitted. The
/// implementation must be resilient to this fact.
fn call(&mut self, req: Request) -> Self::Future;
/// Apply function to specified service and use it as a next service in
/// chain.
fn apply<T, I, F, Out, Req>(
self,
service: I,
f: F,
) -> AndThen<Self, Apply<T, F, Self::Response, Out, Req>>
where
Self: Sized,
T: Service<Req, Error = Self::Error>,
I: IntoService<T, Req>,
F: Fn(Self::Response, &mut T) -> Out,
Out: IntoFuture<Error = Self::Error>,
{
self.and_then(Apply::new(service.into_service(), f))
}
/// Call another service after call to this one has resolved successfully.
///
/// This function can be used to chain two services together and ensure that
/// the second service isn't called until call to the fist service have
/// finished. Result of the call to the first service is used as an
/// input parameter for the second service's call.
///
/// Note that this function consumes the receiving service and returns a
/// wrapped version of it.
fn and_then<F, B>(self, service: F) -> AndThen<Self, B>
where
Self: Sized,
F: IntoService<B, Self::Response>,
B: Service<Self::Response, Error = Self::Error>,
{
AndThen::new(self, service.into_service())
}
/// Map this service's error to any error implementing `From` for
/// this service`s `Error`.
///
/// Note that this function consumes the receiving service and returns a
/// wrapped version of it.
fn from_err<E>(self) -> FromErr<Self, E>
where
Self: Sized,
E: From<Self::Error>,
{
FromErr::new(self)
}
/// Chain on a computation for when a call to the service finished,
/// passing the result of the call to the next service `B`.
///
/// Note that this function consumes the receiving service and returns a
/// wrapped version of it.
fn then<B>(self, service: B) -> Then<Self, B>
where
Self: Sized,
B: Service<Result<Self::Response, Self::Error>, Error = Self::Error>,
{
Then::new(self, service)
}
/// Map this service's output to a different type, returning a new service
/// of the resulting type.
///
/// This function is similar to the `Option::map` or `Iterator::map` where
/// it will change the type of the underlying service.
///
/// Note that this function consumes the receiving service and returns a
/// wrapped version of it, similar to the existing `map` methods in the
/// standard library.
fn map<F, R>(self, f: F) -> Map<Self, F, R>
where
Self: Sized,
F: Fn(Self::Response) -> R,
{
Map::new(self, f)
}
/// Map this service's error to a different error, returning a new service.
///
/// This function is similar to the `Result::map_err` where it will change
/// the error type of the underlying service. This is useful for example to
/// ensure that services have the same error type.
///
/// Note that this function consumes the receiving service and returns a
/// wrapped version of it.
fn map_err<F, E>(self, f: F) -> MapErr<Self, F, E>
where
Self: Sized,
F: Fn(Self::Error) -> E,
{
MapErr::new(self, f)
}
}
/// Creates new `Service` values.
///
/// Acts as a service factory. This is useful for cases where new `Service`
/// values must be produced. One case is a TCP servier listener. The listner
/// accepts new TCP streams, obtains a new `Service` value using the
/// `NewService` trait, and uses that new `Service` value to process inbound
/// requests on that new TCP stream.
///
/// Request - request handled by the service
pub trait NewService<Request> {
/// Responses given by the service
type Response;
/// Errors produced by the service
type Error;
/// The `Service` value created by this factory
type Service: Service<Request, Response = Self::Response, Error = Self::Error>;
/// Errors produced while building a service.
type InitError;
/// The future of the `Service` instance.
type Future: Future<Item = Self::Service, Error = Self::InitError>;
/// Create and return a new service value asynchronously.
fn new_service(&self) -> Self::Future;
/// Apply function to specified service and use it as a next service in
/// chain.
fn apply<T, I, F, Out, Req>(
self,
service: I,
f: F,
) -> AndThenNewService<Self, ApplyNewService<T, F, Self::Response, Out, Req>>
where
Self: Sized,
T: NewService<Req, InitError = Self::InitError, Error = Self::Error>,
I: IntoNewService<T, Req>,
F: Fn(Self::Response, &mut T::Service) -> Out + Clone,
Out: IntoFuture<Error = Self::Error>,
{
self.and_then(ApplyNewService::new(service, f))
}
/// Call another service after call to this one has resolved successfully.
fn and_then<F, B>(self, new_service: F) -> AndThenNewService<Self, B>
where
Self: Sized,
F: IntoNewService<B, Self::Response>,
B: NewService<Self::Response, Error = Self::Error, InitError = Self::InitError>,
{
AndThenNewService::new(self, new_service)
}
/// `NewService` that create service to map this service's error
/// and new service's init error to any error
/// implementing `From` for this service`s `Error`.
///
/// Note that this function consumes the receiving new service and returns a
/// wrapped version of it.
fn from_err<E>(self) -> FromErrNewService<Self, E>
where
Self: Sized,
E: From<Self::Error>,
{
FromErrNewService::new(self)
}
/// Create `NewService` to chain on a computation for when a call to the
/// service finished, passing the result of the call to the next
/// service `B`.
///
/// Note that this function consumes the receiving future and returns a
/// wrapped version of it.
fn then<F, B>(self, new_service: F) -> ThenNewService<Self, B>
where
Self: Sized,
F: IntoNewService<B, Result<Self::Response, Self::Error>>,
B: NewService<
Result<Self::Response, Self::Error>,
Error = Self::Error,
InitError = Self::InitError,
>,
{
ThenNewService::new(self, new_service)
}
/// Map this service's output to a different type, returning a new service
/// of the resulting type.
fn map<F, R>(self, f: F) -> MapNewService<Self, F, R>
where
Self: Sized,
F: Fn(Self::Response) -> R,
{
MapNewService::new(self, f)
}
/// Map this service's error to a different error, returning a new service.
fn map_err<F, E>(self, f: F) -> MapErrNewService<Self, F, E>
where
Self: Sized,
F: Fn(Self::Error) -> E,
{
MapErrNewService::new(self, f)
}
/// Map this service's init error to a different error, returning a new service.
fn map_init_err<F, E>(self, f: F) -> MapInitErr<Self, F, E>
where
Self: Sized,
F: Fn(Self::InitError) -> E,
{
MapInitErr::new(self, f)
}
}
impl<'a, S, Request> Service<Request> for &'a mut S
where
S: Service<Request> + 'a,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self) -> Poll<(), S::Error> {
(**self).poll_ready()
}
fn call(&mut self, request: Request) -> S::Future {
(**self).call(request)
}
}
impl<F, R, E, S, Request> NewService<Request> for F
where
F: Fn() -> R,
R: IntoFuture<Item = S, Error = E>,
S: Service<Request>,
{
type Response = S::Response;
type Error = S::Error;
type Service = S;
type InitError = E;
type Future = R::Future;
fn new_service(&self) -> Self::Future {
(*self)().into_future()
}
}
/// Trait for types that can be converted to a `Service`
pub trait IntoService<T, Request>
where
T: Service<Request>,
{
/// Convert to a `Service`
fn into_service(self) -> T;
}
/// Trait for types that can be converted to a Service
pub trait IntoNewService<T, Request>
where
T: NewService<Request>,
{
/// Convert to an `NewService`
fn into_new_service(self) -> T;
}
impl<T, Request> IntoService<T, Request> for T
where
T: Service<Request>,
{
fn into_service(self) -> T {
self
}
}
impl<T, Request> IntoNewService<T, Request> for T
where
T: NewService<Request>,
{
fn into_new_service(self) -> T {
self
}
}

237
actix-service/src/map.rs Normal file
View File

@ -0,0 +1,237 @@
use std::marker::PhantomData;
use futures::{Async, Future, Poll};
use super::{NewService, Service};
/// Service for the `map` combinator, changing the type of a service's response.
///
/// This is created by the `ServiceExt::map` method.
pub struct Map<A, F, Response> {
service: A,
f: F,
_t: PhantomData<Response>,
}
impl<A, F, Response> Map<A, F, Response> {
/// Create new `Map` combinator
pub fn new<Request>(service: A, f: F) -> Self
where
A: Service<Request>,
F: Fn(A::Response) -> Response,
{
Self {
service,
f,
_t: PhantomData,
}
}
}
impl<A, F, Response> Clone for Map<A, F, Response>
where
A: Clone,
F: Clone,
{
fn clone(&self) -> Self {
Map {
service: self.service.clone(),
f: self.f.clone(),
_t: PhantomData,
}
}
}
impl<A, F, Request, Response> Service<Request> for Map<A, F, Response>
where
A: Service<Request>,
F: Fn(A::Response) -> Response + Clone,
{
type Response = Response;
type Error = A::Error;
type Future = MapFuture<A, F, Request, Response>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready()
}
fn call(&mut self, req: Request) -> Self::Future {
MapFuture::new(self.service.call(req), self.f.clone())
}
}
pub struct MapFuture<A, F, Request, Response>
where
A: Service<Request>,
F: Fn(A::Response) -> Response,
{
f: F,
fut: A::Future,
}
impl<A, F, Request, Response> MapFuture<A, F, Request, Response>
where
A: Service<Request>,
F: Fn(A::Response) -> Response,
{
fn new(fut: A::Future, f: F) -> Self {
MapFuture { f, fut }
}
}
impl<A, F, Request, Response> Future for MapFuture<A, F, Request, Response>
where
A: Service<Request>,
F: Fn(A::Response) -> Response,
{
type Item = Response;
type Error = A::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.fut.poll()? {
Async::Ready(resp) => Ok(Async::Ready((self.f)(resp))),
Async::NotReady => Ok(Async::NotReady),
}
}
}
/// `MapNewService` new service combinator
pub struct MapNewService<A, F, Response> {
a: A,
f: F,
r: PhantomData<Response>,
}
impl<A, F, Response> MapNewService<A, F, Response> {
/// Create new `Map` new service instance
pub fn new<Request>(a: A, f: F) -> Self
where
A: NewService<Request>,
F: Fn(A::Response) -> Response,
{
Self {
a,
f,
r: PhantomData,
}
}
}
impl<A, F, Response> Clone for MapNewService<A, F, Response>
where
A: Clone,
F: Clone,
{
fn clone(&self) -> Self {
Self {
a: self.a.clone(),
f: self.f.clone(),
r: PhantomData,
}
}
}
impl<A, F, Request, Response> NewService<Request> for MapNewService<A, F, Response>
where
A: NewService<Request>,
F: Fn(A::Response) -> Response + Clone,
{
type Response = Response;
type Error = A::Error;
type Service = Map<A::Service, F, Response>;
type InitError = A::InitError;
type Future = MapNewServiceFuture<A, F, Request, Response>;
fn new_service(&self) -> Self::Future {
MapNewServiceFuture::new(self.a.new_service(), self.f.clone())
}
}
pub struct MapNewServiceFuture<A, F, Request, Response>
where
A: NewService<Request>,
F: Fn(A::Response) -> Response,
{
fut: A::Future,
f: Option<F>,
}
impl<A, F, Request, Response> MapNewServiceFuture<A, F, Request, Response>
where
A: NewService<Request>,
F: Fn(A::Response) -> Response,
{
fn new(fut: A::Future, f: F) -> Self {
MapNewServiceFuture { f: Some(f), fut }
}
}
impl<A, F, Request, Response> Future for MapNewServiceFuture<A, F, Request, Response>
where
A: NewService<Request>,
F: Fn(A::Response) -> Response,
{
type Item = Map<A::Service, F, Response>;
type Error = A::InitError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Async::Ready(service) = self.fut.poll()? {
Ok(Async::Ready(Map::new(service, self.f.take().unwrap())))
} else {
Ok(Async::NotReady)
}
}
}
#[cfg(test)]
mod tests {
use futures::future::{ok, FutureResult};
use super::*;
use crate::{IntoNewService, Service};
struct Srv;
impl Service<()> for Srv {
type Response = ();
type Error = ();
type Future = FutureResult<(), ()>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, _: ()) -> Self::Future {
ok(())
}
}
#[test]
fn test_poll_ready() {
let mut srv = Srv.map(|_| "ok");
let res = srv.poll_ready();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(()));
}
#[test]
fn test_call() {
let mut srv = Srv.map(|_| "ok");
let res = srv.call(()).poll();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready("ok"));
}
#[test]
fn test_new_service() {
let blank = || Ok::<_, ()>(Srv);
let new_srv = blank.into_new_service().map(|_| "ok");
if let Async::Ready(mut srv) = new_srv.new_service().poll().unwrap() {
let res = srv.call(()).poll();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready("ok"));
} else {
panic!()
}
}
}

View File

@ -0,0 +1,239 @@
use std::marker::PhantomData;
use futures::{Async, Future, Poll};
use super::{NewService, Service};
/// Service for the `map_err` combinator, changing the type of a service's
/// error.
///
/// This is created by the `ServiceExt::map_err` method.
pub struct MapErr<A, F, E> {
service: A,
f: F,
_t: PhantomData<E>,
}
impl<A, F, E> MapErr<A, F, E> {
/// Create new `MapErr` combinator
pub fn new<Request>(service: A, f: F) -> Self
where
A: Service<Request>,
F: Fn(A::Error) -> E,
{
Self {
service,
f,
_t: PhantomData,
}
}
}
impl<A, F, E> Clone for MapErr<A, F, E>
where
A: Clone,
F: Clone,
{
fn clone(&self) -> Self {
MapErr {
service: self.service.clone(),
f: self.f.clone(),
_t: PhantomData,
}
}
}
impl<A, F, E, Request> Service<Request> for MapErr<A, F, E>
where
A: Service<Request>,
F: Fn(A::Error) -> E + Clone,
{
type Response = A::Response;
type Error = E;
type Future = MapErrFuture<A, F, E, Request>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready().map_err(&self.f)
}
fn call(&mut self, req: Request) -> Self::Future {
MapErrFuture::new(self.service.call(req), self.f.clone())
}
}
pub struct MapErrFuture<A, F, E, Request>
where
A: Service<Request>,
F: Fn(A::Error) -> E,
{
f: F,
fut: A::Future,
}
impl<A, F, E, Request> MapErrFuture<A, F, E, Request>
where
A: Service<Request>,
F: Fn(A::Error) -> E,
{
fn new(fut: A::Future, f: F) -> Self {
MapErrFuture { f, fut }
}
}
impl<A, F, E, Request> Future for MapErrFuture<A, F, E, Request>
where
A: Service<Request>,
F: Fn(A::Error) -> E,
{
type Item = A::Response;
type Error = E;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.fut.poll().map_err(&self.f)
}
}
/// NewService for the `map_err` combinator, changing the type of a new
/// service's error.
///
/// This is created by the `NewServiceExt::map_err` method.
pub struct MapErrNewService<A, F, E> {
a: A,
f: F,
e: PhantomData<E>,
}
impl<A, F, E> MapErrNewService<A, F, E> {
/// Create new `MapErr` new service instance
pub fn new<Request>(a: A, f: F) -> Self
where
A: NewService<Request>,
F: Fn(A::Error) -> E,
{
Self {
a,
f,
e: PhantomData,
}
}
}
impl<A, F, E> Clone for MapErrNewService<A, F, E>
where
A: Clone,
F: Clone,
{
fn clone(&self) -> Self {
Self {
a: self.a.clone(),
f: self.f.clone(),
e: PhantomData,
}
}
}
impl<A, F, E, Request> NewService<Request> for MapErrNewService<A, F, E>
where
A: NewService<Request>,
F: Fn(A::Error) -> E + Clone,
{
type Response = A::Response;
type Error = E;
type Service = MapErr<A::Service, F, E>;
type InitError = A::InitError;
type Future = MapErrNewServiceFuture<A, F, E, Request>;
fn new_service(&self) -> Self::Future {
MapErrNewServiceFuture::new(self.a.new_service(), self.f.clone())
}
}
pub struct MapErrNewServiceFuture<A, F, E, Request>
where
A: NewService<Request>,
F: Fn(A::Error) -> E,
{
fut: A::Future,
f: F,
}
impl<A, F, E, Request> MapErrNewServiceFuture<A, F, E, Request>
where
A: NewService<Request>,
F: Fn(A::Error) -> E,
{
fn new(fut: A::Future, f: F) -> Self {
MapErrNewServiceFuture { f, fut }
}
}
impl<A, F, E, Request> Future for MapErrNewServiceFuture<A, F, E, Request>
where
A: NewService<Request>,
F: Fn(A::Error) -> E + Clone,
{
type Item = MapErr<A::Service, F, E>;
type Error = A::InitError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Async::Ready(service) = self.fut.poll()? {
Ok(Async::Ready(MapErr::new(service, self.f.clone())))
} else {
Ok(Async::NotReady)
}
}
}
#[cfg(test)]
mod tests {
use futures::future::{err, FutureResult};
use super::*;
use crate::{IntoNewService, NewService, Service};
struct Srv;
impl Service<()> for Srv {
type Response = ();
type Error = ();
type Future = FutureResult<(), ()>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Err(())
}
fn call(&mut self, _: ()) -> Self::Future {
err(())
}
}
#[test]
fn test_poll_ready() {
let mut srv = Srv.map_err(|_| "error");
let res = srv.poll_ready();
assert!(res.is_err());
assert_eq!(res.err().unwrap(), "error");
}
#[test]
fn test_call() {
let mut srv = Srv.map_err(|_| "error");
let res = srv.call(()).poll();
assert!(res.is_err());
assert_eq!(res.err().unwrap(), "error");
}
#[test]
fn test_new_service() {
let blank = || Ok::<_, ()>(Srv);
let new_srv = blank.into_new_service().map_err(|_| "error");
if let Async::Ready(mut srv) = new_srv.new_service().poll().unwrap() {
let res = srv.call(()).poll();
assert!(res.is_err());
assert_eq!(res.err().unwrap(), "error");
} else {
panic!()
}
}
}

View File

@ -0,0 +1,90 @@
use std::marker::PhantomData;
use futures::{Future, Poll};
use super::NewService;
/// `MapInitErr` service combinator
pub struct MapInitErr<A, F, E> {
a: A,
f: F,
e: PhantomData<E>,
}
impl<A, F, E> MapInitErr<A, F, E> {
/// Create new `MapInitErr` combinator
pub fn new<Request>(a: A, f: F) -> Self
where
A: NewService<Request>,
F: Fn(A::InitError) -> E,
{
Self {
a,
f,
e: PhantomData,
}
}
}
impl<A, F, E> Clone for MapInitErr<A, F, E>
where
A: Clone,
F: Clone,
{
fn clone(&self) -> Self {
Self {
a: self.a.clone(),
f: self.f.clone(),
e: PhantomData,
}
}
}
impl<A, F, E, Request> NewService<Request> for MapInitErr<A, F, E>
where
A: NewService<Request>,
F: Fn(A::InitError) -> E + Clone,
{
type Response = A::Response;
type Error = A::Error;
type Service = A::Service;
type InitError = E;
type Future = MapInitErrFuture<A, F, E, Request>;
fn new_service(&self) -> Self::Future {
MapInitErrFuture::new(self.a.new_service(), self.f.clone())
}
}
pub struct MapInitErrFuture<A, F, E, Request>
where
A: NewService<Request>,
F: Fn(A::InitError) -> E,
{
f: F,
fut: A::Future,
}
impl<A, F, E, Request> MapInitErrFuture<A, F, E, Request>
where
A: NewService<Request>,
F: Fn(A::InitError) -> E,
{
fn new(fut: A::Future, f: F) -> Self {
MapInitErrFuture { f, fut }
}
}
impl<A, F, E, Request> Future for MapInitErrFuture<A, F, E, Request>
where
A: NewService<Request>,
F: Fn(A::InitError) -> E,
{
type Item = A::Service;
type Error = E;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.fut.poll().map_err(&self.f)
}
}

312
actix-service/src/then.rs Normal file
View File

@ -0,0 +1,312 @@
use futures::{try_ready, Async, Future, Poll};
use super::{IntoNewService, NewService, Service};
use crate::cell::Cell;
/// Service for the `then` combinator, chaining a computation onto the end of
/// another service.
///
/// This is created by the `ServiceExt::then` method.
pub struct Then<A, B> {
a: A,
b: Cell<B>,
}
impl<A, B> Then<A, B> {
/// Create new `Then` combinator
pub fn new<Request>(a: A, b: B) -> Then<A, B>
where
A: Service<Request>,
B: Service<Result<A::Response, A::Error>, Error = A::Error>,
{
Then { a, b: Cell::new(b) }
}
}
impl<A, B> Clone for Then<A, B>
where
A: Clone,
{
fn clone(&self) -> Self {
Then {
a: self.a.clone(),
b: self.b.clone(),
}
}
}
impl<A, B, Request> Service<Request> for Then<A, B>
where
A: Service<Request>,
B: Service<Result<A::Response, A::Error>, Error = A::Error>,
{
type Response = B::Response;
type Error = B::Error;
type Future = ThenFuture<A, B, Request>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
try_ready!(self.a.poll_ready());
self.b.get_mut().poll_ready()
}
fn call(&mut self, req: Request) -> Self::Future {
ThenFuture::new(self.a.call(req), self.b.clone())
}
}
pub struct ThenFuture<A, B, Request>
where
A: Service<Request>,
B: Service<Result<A::Response, A::Error>>,
{
b: Cell<B>,
fut_b: Option<B::Future>,
fut_a: A::Future,
}
impl<A, B, Request> ThenFuture<A, B, Request>
where
A: Service<Request>,
B: Service<Result<A::Response, A::Error>>,
{
fn new(fut_a: A::Future, b: Cell<B>) -> Self {
ThenFuture {
b,
fut_a,
fut_b: None,
}
}
}
impl<A, B, Request> Future for ThenFuture<A, B, Request>
where
A: Service<Request>,
B: Service<Result<A::Response, A::Error>>,
{
type Item = B::Response;
type Error = B::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.fut_b {
return fut.poll();
}
match self.fut_a.poll() {
Ok(Async::Ready(resp)) => {
self.fut_b = Some(self.b.get_mut().call(Ok(resp)));
self.poll()
}
Err(err) => {
self.fut_b = Some(self.b.get_mut().call(Err(err)));
self.poll()
}
Ok(Async::NotReady) => Ok(Async::NotReady),
}
}
}
/// `ThenNewService` new service combinator
pub struct ThenNewService<A, B> {
a: A,
b: B,
}
impl<A, B> ThenNewService<A, B> {
/// Create new `AndThen` combinator
pub fn new<F, Request>(a: A, f: F) -> Self
where
A: NewService<Request>,
B: NewService<
Result<A::Response, A::Error>,
Error = A::Error,
InitError = A::InitError,
>,
F: IntoNewService<B, Result<A::Response, A::Error>>,
{
Self {
a,
b: f.into_new_service(),
}
}
}
impl<A, B, Request> NewService<Request> for ThenNewService<A, B>
where
A: NewService<Request>,
B: NewService<Result<A::Response, A::Error>, Error = A::Error, InitError = A::InitError>,
{
type Response = B::Response;
type Error = A::Error;
type Service = Then<A::Service, B::Service>;
type InitError = A::InitError;
type Future = ThenNewServiceFuture<A, B, Request>;
fn new_service(&self) -> Self::Future {
ThenNewServiceFuture::new(self.a.new_service(), self.b.new_service())
}
}
impl<A, B> Clone for ThenNewService<A, B>
where
A: Clone,
B: Clone,
{
fn clone(&self) -> Self {
Self {
a: self.a.clone(),
b: self.b.clone(),
}
}
}
pub struct ThenNewServiceFuture<A, B, Request>
where
A: NewService<Request>,
B: NewService<Result<A::Response, A::Error>, Error = A::Error, InitError = A::InitError>,
{
fut_b: B::Future,
fut_a: A::Future,
a: Option<A::Service>,
b: Option<B::Service>,
}
impl<A, B, Request> ThenNewServiceFuture<A, B, Request>
where
A: NewService<Request>,
B: NewService<Result<A::Response, A::Error>, Error = A::Error, InitError = A::InitError>,
{
fn new(fut_a: A::Future, fut_b: B::Future) -> Self {
ThenNewServiceFuture {
fut_a,
fut_b,
a: None,
b: None,
}
}
}
impl<A, B, Request> Future for ThenNewServiceFuture<A, B, Request>
where
A: NewService<Request>,
B: NewService<Result<A::Response, A::Error>, Error = A::Error, InitError = A::InitError>,
{
type Item = Then<A::Service, B::Service>;
type Error = A::InitError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.a.is_none() {
if let Async::Ready(service) = self.fut_a.poll()? {
self.a = Some(service);
}
}
if self.b.is_none() {
if let Async::Ready(service) = self.fut_b.poll()? {
self.b = Some(service);
}
}
if self.a.is_some() && self.b.is_some() {
Ok(Async::Ready(Then::new(
self.a.take().unwrap(),
self.b.take().unwrap(),
)))
} else {
Ok(Async::NotReady)
}
}
}
#[cfg(test)]
mod tests {
use futures::future::{err, ok, FutureResult};
use futures::{Async, Poll};
use std::cell::Cell;
use std::rc::Rc;
use super::*;
#[derive(Clone)]
struct Srv1(Rc<Cell<usize>>);
impl Service<Result<&'static str, &'static str>> for Srv1 {
type Response = &'static str;
type Error = ();
type Future = FutureResult<Self::Response, Self::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.0.set(self.0.get() + 1);
Ok(Async::Ready(()))
}
fn call(&mut self, req: Result<&'static str, &'static str>) -> Self::Future {
match req {
Ok(msg) => ok(msg),
Err(_) => err(()),
}
}
}
struct Srv2(Rc<Cell<usize>>);
impl Service<Result<&'static str, ()>> for Srv2 {
type Response = (&'static str, &'static str);
type Error = ();
type Future = FutureResult<Self::Response, ()>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.0.set(self.0.get() + 1);
Ok(Async::Ready(()))
}
fn call(&mut self, req: Result<&'static str, ()>) -> Self::Future {
match req {
Ok(msg) => ok((msg, "ok")),
Err(()) => ok(("srv2", "err")),
}
}
}
#[test]
fn test_poll_ready() {
let cnt = Rc::new(Cell::new(0));
let mut srv = Srv1(cnt.clone()).then(Srv2(cnt.clone()));
let res = srv.poll_ready();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(()));
assert_eq!(cnt.get(), 2);
}
#[test]
fn test_call() {
let cnt = Rc::new(Cell::new(0));
let mut srv = Srv1(cnt.clone()).then(Srv2(cnt)).clone();
let res = srv.call(Ok("srv1")).poll();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv1", "ok")));
let res = srv.call(Err("srv")).poll();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv2", "err")));
}
#[test]
fn test_new_service() {
let cnt = Rc::new(Cell::new(0));
let cnt2 = cnt.clone();
let blank = move || Ok::<_, ()>(Srv1(cnt2.clone()));
let new_srv = blank.into_new_service().then(move || Ok(Srv2(cnt.clone())));
if let Async::Ready(mut srv) = new_srv.clone().new_service().poll().unwrap() {
let res = srv.call(Ok("srv1")).poll();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv1", "ok")));
let res = srv.call(Err("srv")).poll();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv2", "err")));
} else {
panic!()
}
}
}