From f2ef824011158e6b807c94d652a11c694568f11e Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 23 Aug 2018 22:12:10 -0700 Subject: [PATCH] add NewConfigurableService trait --- src/configurable.rs | 459 ++++++++++++++++++++++++++++++++++++++++++++ src/connector.rs | 8 +- src/lib.rs | 4 +- 3 files changed, 466 insertions(+), 5 deletions(-) create mode 100644 src/configurable.rs diff --git a/src/configurable.rs b/src/configurable.rs new file mode 100644 index 00000000..0a6e5f60 --- /dev/null +++ b/src/configurable.rs @@ -0,0 +1,459 @@ +use std::marker; + +use futures::{future, future::FutureResult, Async, Future, IntoFuture, Poll}; +use tower_service::Service; + +use service::{AndThen, FnService, MapErr}; + +/// Creates new `Service` values. +/// +/// Acts as a service factory. This is useful for cases where new `Service` +/// values must be produced. One case is a TCP servier listener. The listner +/// accepts new TCP streams, obtains a new `Service` value using the +/// `NewConfigurableService` trait, and uses that new `Service` value to +/// process inbound requests on that new TCP stream. +pub trait NewConfigurableService { + /// Requests handled by the service + type Request; + + /// Responses given by the service + type Response; + + /// Errors produced by the service + type Error; + + /// The `Service` value created by this factory + type Service: Service< + Request = Self::Request, + Response = Self::Response, + Error = Self::Error, + >; + + /// Pipeline configuration + type Config; + + /// Errors produced while building a service. + type InitError; + + /// The future of the `Service` instance. + type Future: Future; + + /// Create and return a new service value asynchronously. + fn new_service(&self, Self::Config) -> Self::Future; + + fn and_then(self, new_service: F) -> AndThenNewConfigurableService + where + Self: Sized, + F: IntoNewConfigurableService, + B: NewConfigurableService< + Request = Self::Response, + Error = Self::Error, + Config = Self::Config, + InitError = Self::InitError, + >, + { + AndThenNewConfigurableService::new(self, new_service) + } + + fn map_err(self, f: F) -> MapErrNewConfigurableService + where + Self: Sized, + F: Fn(Self::Error) -> E, + { + MapErrNewConfigurableService::new(self, f) + } + + fn map_init_err(self, f: F) -> MapInitErr + where + Self: Sized, + F: Fn(Self::InitError) -> E, + { + MapInitErr::new(self, f) + } +} + +/// Trait for types that can be converted to a Service +pub trait IntoNewConfigurableService +where + T: NewConfigurableService, +{ + /// Create service + fn into_new_service(self) -> T; +} + +impl IntoNewConfigurableService for T +where + T: NewConfigurableService, +{ + fn into_new_service(self) -> T { + self + } +} + +pub struct FnNewConfigurableService +where + F: Fn(Req) -> Fut, + Fut: IntoFuture, +{ + f: F, + req: marker::PhantomData, + resp: marker::PhantomData, + err: marker::PhantomData, + ierr: marker::PhantomData, + cfg: marker::PhantomData, +} + +impl + FnNewConfigurableService +where + F: Fn(Req) -> Fut + Clone, + Fut: IntoFuture, +{ + fn new(f: F) -> Self { + FnNewConfigurableService { + f, + req: marker::PhantomData, + resp: marker::PhantomData, + err: marker::PhantomData, + ierr: marker::PhantomData, + cfg: marker::PhantomData, + } + } +} + +impl NewConfigurableService + for FnNewConfigurableService +where + F: Fn(Req) -> Fut + Clone, + Fut: IntoFuture, +{ + type Request = Req; + type Response = Resp; + type Error = Err; + type Service = FnService; + type Config = Cfg; + type InitError = IErr; + type Future = FutureResult; + + fn new_service(&self, _: Cfg) -> Self::Future { + future::ok(FnService::new(self.f.clone())) + } +} + +impl + IntoNewConfigurableService> + for F +where + F: Fn(Req) -> Fut + Clone + 'static, + Fut: IntoFuture, +{ + fn into_new_service(self) -> FnNewConfigurableService { + FnNewConfigurableService::new(self) + } +} + +impl Clone + for FnNewConfigurableService +where + F: Fn(Req) -> Fut + Clone, + Fut: IntoFuture, +{ + fn clone(&self) -> Self { + Self::new(self.f.clone()) + } +} + +/// `AndThenNewConfigurableService` new service combinator +pub struct AndThenNewConfigurableService { + a: A, + b: B, +} + +impl AndThenNewConfigurableService +where + A: NewConfigurableService, + B: NewConfigurableService, +{ + /// Create new `AndThen` combinator + pub fn new>(a: A, f: F) -> Self { + Self { + a, + b: f.into_new_service(), + } + } +} + +impl NewConfigurableService for AndThenNewConfigurableService +where + A: NewConfigurableService< + Response = B::Request, + Config = B::Config, + InitError = B::InitError, + >, + A::Config: Clone, + A::Error: Into, + B: NewConfigurableService, +{ + type Request = A::Request; + type Response = B::Response; + type Error = B::Error; + type Service = AndThen; + + type Config = A::Config; + type InitError = A::InitError; + type Future = AndThenNewConfigurableServiceFuture; + + fn new_service(&self, cfg: A::Config) -> Self::Future { + AndThenNewConfigurableServiceFuture::new( + self.a.new_service(cfg.clone()), + self.b.new_service(cfg), + ) + } +} + +impl Clone for AndThenNewConfigurableService +where + A: NewConfigurableService + Clone, + A::Error: Into, + B: NewConfigurableService + Clone, +{ + fn clone(&self) -> Self { + Self { + a: self.a.clone(), + b: self.b.clone(), + } + } +} + +pub struct AndThenNewConfigurableServiceFuture +where + A: NewConfigurableService, + B: NewConfigurableService, +{ + fut_b: B::Future, + fut_a: A::Future, + a: Option, + b: Option, +} + +impl AndThenNewConfigurableServiceFuture +where + A: NewConfigurableService, + B: NewConfigurableService, +{ + fn new(fut_a: A::Future, fut_b: B::Future) -> Self { + AndThenNewConfigurableServiceFuture { + fut_a, + fut_b, + a: None, + b: None, + } + } +} + +impl Future for AndThenNewConfigurableServiceFuture +where + A: NewConfigurableService, + A::Error: Into, + B: NewConfigurableService, +{ + type Item = AndThen; + type Error = B::InitError; + + fn poll(&mut self) -> Poll { + if let Async::Ready(service) = self.fut_a.poll()? { + self.a = Some(service); + } + + if let Async::Ready(service) = self.fut_b.poll()? { + self.b = Some(service); + } + + if self.a.is_some() && self.b.is_some() { + Ok(Async::Ready(AndThen::new( + self.a.take().unwrap(), + self.b.take().unwrap(), + ))) + } else { + Ok(Async::NotReady) + } + } +} + +/// `MapErrNewService` new service combinator +pub struct MapErrNewConfigurableService { + a: A, + f: F, + e: marker::PhantomData, +} + +impl MapErrNewConfigurableService +where + A: NewConfigurableService, + F: Fn(A::Error) -> E, +{ + /// Create new `MapErr` new service instance + pub fn new(a: A, f: F) -> Self { + Self { + a, + f, + e: marker::PhantomData, + } + } +} + +impl Clone for MapErrNewConfigurableService +where + A: NewConfigurableService + Clone, + F: Fn(A::Error) -> E + Clone, +{ + fn clone(&self) -> Self { + Self { + a: self.a.clone(), + f: self.f.clone(), + e: marker::PhantomData, + } + } +} + +impl NewConfigurableService for MapErrNewConfigurableService +where + A: NewConfigurableService + Clone, + F: Fn(A::Error) -> E + Clone, +{ + type Request = A::Request; + type Response = A::Response; + type Error = E; + type Service = MapErr; + + type Config = A::Config; + type InitError = A::InitError; + type Future = MapErrNewConfigurableServiceFuture; + + fn new_service(&self, cfg: Self::Config) -> Self::Future { + MapErrNewConfigurableServiceFuture::new(self.a.new_service(cfg), self.f.clone()) + } +} + +pub struct MapErrNewConfigurableServiceFuture +where + A: NewConfigurableService, + F: Fn(A::Error) -> E, +{ + fut: A::Future, + f: F, +} + +impl MapErrNewConfigurableServiceFuture +where + A: NewConfigurableService, + F: Fn(A::Error) -> E, +{ + fn new(fut: A::Future, f: F) -> Self { + MapErrNewConfigurableServiceFuture { f, fut } + } +} + +impl Future for MapErrNewConfigurableServiceFuture +where + A: NewConfigurableService, + F: Fn(A::Error) -> E + Clone, +{ + type Item = MapErr; + type Error = A::InitError; + + fn poll(&mut self) -> Poll { + if let Async::Ready(service) = self.fut.poll()? { + Ok(Async::Ready(MapErr::new(service, self.f.clone()))) + } else { + Ok(Async::NotReady) + } + } +} + +/// `MapInitErr` service combinator +pub struct MapInitErr { + a: A, + f: F, + e: marker::PhantomData, +} + +impl MapInitErr +where + A: NewConfigurableService, + F: Fn(A::InitError) -> E, +{ + /// Create new `MapInitErr` combinator + pub fn new(a: A, f: F) -> Self { + Self { + a, + f, + e: marker::PhantomData, + } + } +} + +impl Clone for MapInitErr +where + A: NewConfigurableService + Clone, + F: Fn(A::InitError) -> E + Clone, +{ + fn clone(&self) -> Self { + Self { + a: self.a.clone(), + f: self.f.clone(), + e: marker::PhantomData, + } + } +} + +impl NewConfigurableService for MapInitErr +where + A: NewConfigurableService, + F: Fn(A::InitError) -> E + Clone, +{ + type Request = A::Request; + type Response = A::Response; + type Error = A::Error; + type Service = A::Service; + + type Config = A::Config; + type InitError = E; + type Future = MapInitErrFuture; + + fn new_service(&self, cfg: Self::Config) -> Self::Future { + MapInitErrFuture::new(self.a.new_service(cfg), self.f.clone()) + } +} + +pub struct MapInitErrFuture +where + A: NewConfigurableService, + F: Fn(A::InitError) -> E, +{ + f: F, + fut: A::Future, +} + +impl MapInitErrFuture +where + A: NewConfigurableService, + F: Fn(A::InitError) -> E, +{ + fn new(fut: A::Future, f: F) -> Self { + MapInitErrFuture { f, fut } + } +} + +impl Future for MapInitErrFuture +where + A: NewConfigurableService, + F: Fn(A::InitError) -> E, +{ + type Item = A::Service; + type Error = E; + + fn poll(&mut self) -> Poll { + self.fut.poll().map_err(&self.f) + } +} diff --git a/src/connector.rs b/src/connector.rs index 4a5cb298..9a80a984 100644 --- a/src/connector.rs +++ b/src/connector.rs @@ -62,7 +62,7 @@ impl Service for Connector { fn call(&mut self, addr: String) -> Self::Future { ConnectorFuture { fut: ResolveFut::new(addr, 0, &self.resolver), - fut2: None + fut2: None, } } } @@ -78,14 +78,14 @@ impl Future for ConnectorFuture { fn poll(&mut self) -> Poll { if let Some(ref mut fut) = self.fut2 { - return fut.poll() + return fut.poll(); } match self.fut.poll()? { Async::Ready(addrs) => { self.fut2 = Some(TcpConnector::new(addrs)); self.poll() - }, - Async::NotReady => Ok(Async::NotReady) + } + Async::NotReady => Ok(Async::NotReady), } } } diff --git a/src/lib.rs b/src/lib.rs index 4b19d341..c90d0041 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -52,13 +52,15 @@ use actix::Message; pub use tower_service::{NewService, Service}; pub(crate) mod accept; +mod configurable; +mod connector; mod server; mod server_service; pub mod service; pub mod ssl; mod worker; -mod connector; +pub use configurable::NewConfigurableService; pub use connector::{Connector, ConnectorError}; pub use server::Server; pub use service::{IntoNewService, IntoService, NewServiceExt};