1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-01-18 20:01:48 +01:00

add .then() combinator

This commit is contained in:
Nikolay Kim 2018-10-02 21:47:50 -07:00
parent 028279713e
commit 79b525bc2f
3 changed files with 368 additions and 6 deletions

View File

@ -52,11 +52,8 @@ where
type Future = AndThenFuture<A, B>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
match self.a.poll_ready() {
Ok(Async::Ready(_)) => self.b.borrow_mut().poll_ready(),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Err(err),
}
let _ = try_ready!(self.a.poll_ready());
self.b.borrow_mut().poll_ready()
}
fn call(&mut self, req: Self::Request) -> Self::Future {

View File

@ -10,6 +10,7 @@ mod from_err;
mod map;
mod map_err;
mod map_init_err;
mod then;
pub use self::and_then::{AndThen, AndThenNewService};
pub use self::apply::{Apply, ApplyNewService};
@ -18,6 +19,7 @@ pub use self::from_err::{FromErr, FromErrNewService};
pub use self::map::{Map, MapNewService};
pub use self::map_err::{MapErr, MapErrNewService};
pub use self::map_init_err::MapInitErr;
pub use self::then::{Then, ThenNewService};
/// An extension trait for `Service`s that provides a variety of convenient
/// adapters
@ -69,6 +71,19 @@ pub trait ServiceExt: Service {
FromErr::new(self)
}
/// Chain on a computation for when a call to the service finished,
/// passing the result of the call to the next service `B`.
///
/// Note that this function consumes the receiving future and returns a
/// wrapped version of it.
fn then<B>(self, service: B) -> Then<Self, B>
where
Self: Sized,
B: Service<Request = Result<Self::Response, Self::Error>, Error = Self::Error>,
{
Then::new(self, service)
}
/// Map this service's output to a different type, returning a new service
/// of the resulting type.
///
@ -131,7 +146,8 @@ pub trait NewServiceExt: NewService {
AndThenNewService::new(self, new_service)
}
/// Map this service's error and new service's init error to any error
/// `NewService` that create service to map this service's error
/// and new service's init error to any error
/// implementing `From` for this service`s `Error`.
///
/// Note that this function consumes the receiving new service and returns a
@ -144,6 +160,25 @@ pub trait NewServiceExt: NewService {
FromErrNewService::new(self)
}
/// Create `NewService` to chain on a computation for when a call to the
/// service finished, passing the result of the call to the next
/// service `B`.
///
/// Note that this function consumes the receiving future and returns a
/// wrapped version of it.
fn then<F, B>(self, new_service: F) -> ThenNewService<Self, B>
where
Self: Sized,
F: IntoNewService<B>,
B: NewService<
Request = Result<Self::Response, Self::Error>,
Error = Self::Error,
InitError = Self::InitError,
>,
{
ThenNewService::new(self, new_service)
}
fn map<F, R>(self, f: F) -> MapNewService<Self, F, R>
where
Self: Sized,

330
src/service/then.rs Normal file
View File

@ -0,0 +1,330 @@
use std::cell::RefCell;
use std::rc::Rc;
use futures::{Async, Future, Poll};
use super::{IntoNewService, NewService, Service};
/// Service for the `then` combinator, chaining a computation onto the end of
/// another service.
///
/// This is created by the `ServiceExt::then` method.
pub struct Then<A, B> {
a: A,
b: Rc<RefCell<B>>,
}
impl<A, B> Then<A, B>
where
A: Service,
B: Service<Request = Result<A::Response, A::Error>, Error = A::Error>,
{
/// Create new `Then` combinator
pub fn new(a: A, b: B) -> Then<A, B> {
Then {
a,
b: Rc::new(RefCell::new(b)),
}
}
}
impl<A, B> Clone for Then<A, B>
where
A: Service + Clone,
B: Service<Request = Result<A::Response, A::Error>, Error = A::Error>,
{
fn clone(&self) -> Self {
Then {
a: self.a.clone(),
b: self.b.clone(),
}
}
}
impl<A, B> Service for Then<A, B>
where
A: Service,
B: Service<Request = Result<A::Response, A::Error>, Error = A::Error>,
{
type Request = A::Request;
type Response = B::Response;
type Error = B::Error;
type Future = ThenFuture<A, B>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
let _ = try_ready!(self.a.poll_ready());
self.b.borrow_mut().poll_ready()
}
fn call(&mut self, req: Self::Request) -> Self::Future {
ThenFuture::new(self.a.call(req), self.b.clone())
}
}
pub struct ThenFuture<A, B>
where
A: Service,
B: Service<Request = Result<A::Response, A::Error>>,
{
b: Rc<RefCell<B>>,
fut_b: Option<B::Future>,
fut_a: A::Future,
}
impl<A, B> ThenFuture<A, B>
where
A: Service,
B: Service<Request = Result<A::Response, A::Error>>,
{
fn new(fut_a: A::Future, b: Rc<RefCell<B>>) -> Self {
ThenFuture {
b,
fut_a,
fut_b: None,
}
}
}
impl<A, B> Future for ThenFuture<A, B>
where
A: Service,
B: Service<Request = Result<A::Response, A::Error>>,
{
type Item = B::Response;
type Error = B::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.fut_b {
return fut.poll();
}
match self.fut_a.poll() {
Ok(Async::Ready(resp)) => {
self.fut_b = Some(self.b.borrow_mut().call(Ok(resp)));
self.poll()
}
Err(err) => {
self.fut_b = Some(self.b.borrow_mut().call(Err(err)));
self.poll()
}
Ok(Async::NotReady) => Ok(Async::NotReady),
}
}
}
/// `ThenNewService` new service combinator
pub struct ThenNewService<A, B> {
a: A,
b: B,
}
impl<A, B> ThenNewService<A, B>
where
A: NewService,
B: NewService,
{
/// Create new `AndThen` combinator
pub fn new<F: IntoNewService<B>>(a: A, f: F) -> Self {
Self {
a,
b: f.into_new_service(),
}
}
}
impl<A, B> NewService for ThenNewService<A, B>
where
A: NewService,
B: NewService<
Request = Result<A::Response, A::Error>,
Error = A::Error,
InitError = A::InitError,
>,
{
type Request = A::Request;
type Response = B::Response;
type Error = A::Error;
type Service = Then<A::Service, B::Service>;
type InitError = A::InitError;
type Future = ThenNewServiceFuture<A, B>;
fn new_service(&self) -> Self::Future {
ThenNewServiceFuture::new(self.a.new_service(), self.b.new_service())
}
}
impl<A, B> Clone for ThenNewService<A, B>
where
A: NewService + Clone,
B: NewService<
Request = Result<A::Response, A::Error>,
Error = A::Error,
InitError = A::InitError,
> + Clone,
{
fn clone(&self) -> Self {
Self {
a: self.a.clone(),
b: self.b.clone(),
}
}
}
pub struct ThenNewServiceFuture<A, B>
where
A: NewService,
B: NewService,
{
fut_b: B::Future,
fut_a: A::Future,
a: Option<A::Service>,
b: Option<B::Service>,
}
impl<A, B> ThenNewServiceFuture<A, B>
where
A: NewService,
B: NewService,
{
fn new(fut_a: A::Future, fut_b: B::Future) -> Self {
ThenNewServiceFuture {
fut_a,
fut_b,
a: None,
b: None,
}
}
}
impl<A, B> Future for ThenNewServiceFuture<A, B>
where
A: NewService,
B: NewService<
Request = Result<A::Response, A::Error>,
Error = A::Error,
InitError = A::InitError,
>,
{
type Item = Then<A::Service, B::Service>;
type Error = A::InitError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.a.is_none() {
if let Async::Ready(service) = self.fut_a.poll()? {
self.a = Some(service);
}
}
if self.b.is_none() {
if let Async::Ready(service) = self.fut_b.poll()? {
self.b = Some(service);
}
}
if self.a.is_some() && self.b.is_some() {
Ok(Async::Ready(Then::new(
self.a.take().unwrap(),
self.b.take().unwrap(),
)))
} else {
Ok(Async::NotReady)
}
}
}
#[cfg(test)]
mod tests {
use futures::future::{err, ok, FutureResult};
use futures::{Async, Poll};
use std::cell::Cell;
use std::rc::Rc;
use super::*;
use service::{NewServiceExt, ServiceExt};
struct Srv1(Rc<Cell<usize>>);
impl Service for Srv1 {
type Request = Result<&'static str, &'static str>;
type Response = &'static str;
type Error = ();
type Future = FutureResult<Self::Response, Self::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.0.set(self.0.get() + 1);
Ok(Async::Ready(()))
}
fn call(&mut self, req: Self::Request) -> Self::Future {
match req {
Ok(msg) => ok(msg),
Err(_) => err(()),
}
}
}
#[derive(Clone)]
struct Srv2(Rc<Cell<usize>>);
impl Service for Srv2 {
type Request = Result<&'static str, ()>;
type Response = (&'static str, &'static str);
type Error = ();
type Future = FutureResult<Self::Response, ()>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.0.set(self.0.get() + 1);
Ok(Async::Ready(()))
}
fn call(&mut self, req: Self::Request) -> Self::Future {
match req {
Ok(msg) => ok((msg, "ok")),
Err(()) => ok(("srv2", "err")),
}
}
}
#[test]
fn test_poll_ready() {
let cnt = Rc::new(Cell::new(0));
let mut srv = Srv1(cnt.clone()).then(Srv2(cnt.clone()));
let res = srv.poll_ready();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(()));
assert_eq!(cnt.get(), 2);
}
#[test]
fn test_call() {
let cnt = Rc::new(Cell::new(0));
let mut srv = Srv1(cnt.clone()).then(Srv2(cnt));
let res = srv.call(Ok("srv1")).poll();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv1", "ok")));
let res = srv.call(Err("srv")).poll();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv2", "err")));
}
#[test]
fn test_new_service() {
let cnt = Rc::new(Cell::new(0));
let cnt2 = cnt.clone();
let blank = move || Ok::<_, ()>(Srv1(cnt2.clone()));
let new_srv = blank.into_new_service().then(move || Ok(Srv2(cnt.clone())));
if let Async::Ready(mut srv) = new_srv.new_service().poll().unwrap() {
let res = srv.call(Ok("srv1")).poll();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv1", "ok")));
let res = srv.call(Err("srv")).poll();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv2", "err")));
} else {
panic!()
}
}
}