diff --git a/src/configurable.rs b/src/configurable.rs index 6be5c249..8bce29bf 100644 --- a/src/configurable.rs +++ b/src/configurable.rs @@ -110,19 +110,18 @@ where Fut: IntoFuture, { fn new(f: F) -> Self { - Fn2NewConfigurableService{ + Fn2NewConfigurableService { f, err: marker::PhantomData, cfg: marker::PhantomData, fut: marker::PhantomData, - s: marker::PhantomData + s: marker::PhantomData, } } } impl - IntoNewConfigurableService> - for F + IntoNewConfigurableService> for F where S: Service, F: Fn(Cfg) -> Fut + 'static, @@ -144,7 +143,8 @@ where } } -impl NewConfigurableService for Fn2NewConfigurableService +impl NewConfigurableService + for Fn2NewConfigurableService where S: Service, F: Fn(Cfg) -> Fut, diff --git a/src/lib.rs b/src/lib.rs index 1abeff3c..5f2faaca 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -60,7 +60,7 @@ pub mod service; pub mod ssl; mod worker; -pub use configurable::{NewConfigurableService, IntoNewConfigurableService}; +pub use configurable::{IntoNewConfigurableService, NewConfigurableService}; pub use connector::{Connector, ConnectorError}; pub use server::Server; pub use service::{IntoNewService, IntoService, NewServiceExt}; diff --git a/src/service.rs b/src/service.rs deleted file mode 100644 index b655f0e7..00000000 --- a/src/service.rs +++ /dev/null @@ -1,795 +0,0 @@ -use std::cell::RefCell; -use std::marker; -use std::rc::Rc; - -use futures::{future, future::FutureResult, Async, Future, IntoFuture, Poll}; -use tower_service::{NewService, Service}; - -pub trait NewServiceExt: NewService { - fn and_then(self, new_service: F) -> AndThenNewService - where - Self: Sized, - F: IntoNewService, - B: NewService< - Request = Self::Response, - Error = Self::Error, - InitError = Self::InitError, - >, - { - AndThenNewService::new(self, new_service) - } - - fn map_err(self, f: F) -> MapErrNewService - where - Self: Sized, - F: Fn(Self::Error) -> E, - { - MapErrNewService::new(self, f) - } - - fn map_init_err(self, f: F) -> MapInitErr - where - Self: Sized, - F: Fn(Self::InitError) -> E, - { - MapInitErr::new(self, f) - } -} - -impl NewServiceExt for T {} - -/// Trait for types that can be converted to a Service -pub trait IntoService -where - T: Service, -{ - /// Create service - fn into(self) -> T; -} - -/// Trait for types that can be converted to a Service -pub trait IntoNewService -where - T: NewService, -{ - /// Create service - fn into_new_service(self) -> T; -} - -impl IntoService for T -where - T: Service, -{ - fn into(self) -> T { - self - } -} - -impl IntoNewService for T -where - T: NewService, -{ - fn into_new_service(self) -> T { - self - } -} - -impl IntoService> for F -where - F: Fn(Req) -> Fut + 'static, - Fut: IntoFuture, -{ - fn into(self) -> FnService { - FnService::new(self) - } -} - -pub struct FnService -where - F: Fn(Req) -> Fut, - Fut: IntoFuture, -{ - f: F, - req: marker::PhantomData, - resp: marker::PhantomData, - err: marker::PhantomData, -} - -impl FnService -where - F: Fn(Req) -> Fut, - Fut: IntoFuture, -{ - pub fn new(f: F) -> Self { - FnService { - f, - req: marker::PhantomData, - resp: marker::PhantomData, - err: marker::PhantomData, - } - } -} - -impl Service for FnService -where - F: Fn(Req) -> Fut, - Fut: IntoFuture, -{ - type Request = Req; - type Response = Resp; - type Error = E; - type Future = Fut::Future; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) - } - - fn call(&mut self, req: Req) -> Self::Future { - (self.f)(req).into_future() - } -} - -pub struct FnNewService -where - F: Fn(Req) -> Fut, - Fut: IntoFuture, -{ - f: F, - req: marker::PhantomData, - resp: marker::PhantomData, - err: marker::PhantomData, - ierr: marker::PhantomData, -} - -impl FnNewService -where - F: Fn(Req) -> Fut + Clone, - Fut: IntoFuture, -{ - fn new(f: F) -> Self { - FnNewService { - f, - req: marker::PhantomData, - resp: marker::PhantomData, - err: marker::PhantomData, - ierr: marker::PhantomData, - } - } -} - -impl NewService for FnNewService -where - F: Fn(Req) -> Fut + Clone, - Fut: IntoFuture, -{ - type Request = Req; - type Response = Resp; - type Error = Err; - type Service = FnService; - type InitError = IErr; - type Future = FutureResult; - - fn new_service(&self) -> Self::Future { - future::ok(FnService::new(self.f.clone())) - } -} - -impl IntoNewService> - for F -where - F: Fn(Req) -> Fut + Clone + 'static, - Fut: IntoFuture, -{ - fn into_new_service(self) -> FnNewService { - FnNewService::new(self) - } -} - -impl Clone for FnNewService -where - F: Fn(Req) -> Fut + Clone, - Fut: IntoFuture, -{ - fn clone(&self) -> Self { - Self::new(self.f.clone()) - } -} - -pub struct FnStateService -where - F: Fn(&mut S, Req) -> Fut, - Fut: IntoFuture, -{ - f: F, - state: S, - req: marker::PhantomData, - resp: marker::PhantomData, - err: marker::PhantomData, -} - -impl FnStateService -where - F: Fn(&mut S, Req) -> Fut, - Fut: IntoFuture, -{ - pub fn new(state: S, f: F) -> Self { - FnStateService { - f, - state, - req: marker::PhantomData, - resp: marker::PhantomData, - err: marker::PhantomData, - } - } -} - -impl Service for FnStateService -where - F: Fn(&mut S, Req) -> Fut, - Fut: IntoFuture, -{ - type Request = Req; - type Response = Resp; - type Error = Err; - type Future = Fut::Future; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) - } - - fn call(&mut self, req: Req) -> Self::Future { - (self.f)(&mut self.state, req).into_future() - } -} - -/// `NewService` for state and handler functions -pub struct FnStateNewService { - f: F1, - state: F2, - s: marker::PhantomData, - req: marker::PhantomData, - resp: marker::PhantomData, - err1: marker::PhantomData, - err2: marker::PhantomData, - fut1: marker::PhantomData, - fut2: marker::PhantomData, -} - -impl - FnStateNewService -{ - fn new(f: F1, state: F2) -> Self { - FnStateNewService { - f, - state, - s: marker::PhantomData, - req: marker::PhantomData, - resp: marker::PhantomData, - err1: marker::PhantomData, - err2: marker::PhantomData, - fut1: marker::PhantomData, - fut2: marker::PhantomData, - } - } -} - -impl NewService - for FnStateNewService -where - S: 'static, - F1: Fn(&mut S, Req) -> Fut1 + Clone + 'static, - F2: Fn() -> Fut2, - Fut1: IntoFuture + 'static, - Fut2: IntoFuture + 'static, - Req: 'static, - Resp: 'static, - Err1: 'static, - Err2: 'static, -{ - type Request = Req; - type Response = Resp; - type Error = Err1; - type Service = FnStateService; - type InitError = Err2; - type Future = Box>; - - fn new_service(&self) -> Self::Future { - let f = self.f.clone(); - Box::new( - (self.state)() - .into_future() - .and_then(move |state| Ok(FnStateService::new(state, f))), - ) - } -} - -impl - IntoNewService> for (F1, F2) -where - S: 'static, - F1: Fn(&mut S, Req) -> Fut1 + Clone + 'static, - F2: Fn() -> Fut2, - Fut1: IntoFuture + 'static, - Fut2: IntoFuture + 'static, - Req: 'static, - Resp: 'static, - Err1: 'static, - Err2: 'static, -{ - fn into_new_service( - self, - ) -> FnStateNewService { - FnStateNewService::new(self.0, self.1) - } -} - -impl Clone - for FnStateNewService -where - F1: Fn(&mut S, Req) -> Fut1 + Clone + 'static, - F2: Fn() -> Fut2 + Clone, - Fut1: IntoFuture, - Fut2: IntoFuture, -{ - fn clone(&self) -> Self { - Self::new(self.f.clone(), self.state.clone()) - } -} - -/// `AndThen` service combinator -pub struct AndThen { - a: A, - b: Rc>, -} - -impl AndThen -where - A: Service, - A::Error: Into, - B: Service, -{ - /// Create new `AndThen` combinator - pub fn new(a: A, b: B) -> Self { - Self { - a, - b: Rc::new(RefCell::new(b)), - } - } -} - -impl Service for AndThen -where - A: Service, - A::Error: Into, - B: Service, -{ - type Request = A::Request; - type Response = B::Response; - type Error = B::Error; - type Future = AndThenFuture; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - match self.a.poll_ready() { - Ok(Async::Ready(_)) => self.b.borrow_mut().poll_ready(), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(err) => Err(err.into()), - } - } - - fn call(&mut self, req: Self::Request) -> Self::Future { - AndThenFuture::new(self.a.call(req), self.b.clone()) - } -} - -pub struct AndThenFuture -where - A: Service, - A::Error: Into, - B: Service, -{ - b: Rc>, - fut_b: Option, - fut_a: A::Future, -} - -impl AndThenFuture -where - A: Service, - A::Error: Into, - B: Service, -{ - fn new(fut_a: A::Future, b: Rc>) -> Self { - AndThenFuture { - b, - fut_a, - fut_b: None, - } - } -} - -impl Future for AndThenFuture -where - A: Service, - A::Error: Into, - B: Service, -{ - type Item = B::Response; - type Error = B::Error; - - fn poll(&mut self) -> Poll { - if let Some(ref mut fut) = self.fut_b { - return fut.poll(); - } - - match self.fut_a.poll() { - Ok(Async::Ready(resp)) => { - self.fut_b = Some(self.b.borrow_mut().call(resp)); - self.poll() - } - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(err) => Err(err.into()), - } - } -} - -/// `AndThenNewService` new service combinator -pub struct AndThenNewService { - a: A, - b: B, -} - -impl AndThenNewService -where - A: NewService, - B: NewService, -{ - /// Create new `AndThen` combinator - pub fn new>(a: A, f: F) -> Self { - Self { - a, - b: f.into_new_service(), - } - } -} - -impl NewService for AndThenNewService -where - A: NewService, - A::Error: Into, - B: NewService, -{ - type Request = A::Request; - type Response = B::Response; - type Error = B::Error; - type Service = AndThen; - - type InitError = A::InitError; - type Future = AndThenNewServiceFuture; - - fn new_service(&self) -> Self::Future { - AndThenNewServiceFuture::new(self.a.new_service(), self.b.new_service()) - } -} - -impl Clone for AndThenNewService -where - A: NewService + Clone, - A::Error: Into, - B: NewService + Clone, -{ - fn clone(&self) -> Self { - Self { - a: self.a.clone(), - b: self.b.clone(), - } - } -} - -pub struct AndThenNewServiceFuture -where - A: NewService, - B: NewService, -{ - fut_b: B::Future, - fut_a: A::Future, - a: Option, - b: Option, -} - -impl AndThenNewServiceFuture -where - A: NewService, - B: NewService, -{ - fn new(fut_a: A::Future, fut_b: B::Future) -> Self { - AndThenNewServiceFuture { - fut_a, - fut_b, - a: None, - b: None, - } - } -} - -impl Future for AndThenNewServiceFuture -where - A: NewService, - A::Error: Into, - B: NewService, -{ - type Item = AndThen; - type Error = B::InitError; - - fn poll(&mut self) -> Poll { - if self.a.is_none() { - if let Async::Ready(service) = self.fut_a.poll()? { - self.a = Some(service); - } - } - - if self.b.is_none() { - if let Async::Ready(service) = self.fut_b.poll()? { - self.b = Some(service); - } - } - - if self.a.is_some() && self.b.is_some() { - Ok(Async::Ready(AndThen::new( - self.a.take().unwrap(), - self.b.take().unwrap(), - ))) - } else { - Ok(Async::NotReady) - } - } -} - -/// `MapErr` service combinator -pub struct MapErr { - a: A, - f: F, - e: marker::PhantomData, -} - -impl MapErr -where - A: Service, - F: Fn(A::Error) -> E, -{ - /// Create new `MapErr` combinator - pub fn new(a: A, f: F) -> Self { - Self { - a, - f, - e: marker::PhantomData, - } - } -} - -impl Service for MapErr -where - A: Service, - F: Fn(A::Error) -> E, - F: Clone, -{ - type Request = A::Request; - type Response = A::Response; - type Error = E; - type Future = MapErrFuture; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.a.poll_ready().map_err(&self.f) - } - - fn call(&mut self, req: Self::Request) -> Self::Future { - MapErrFuture::new(self.a.call(req), self.f.clone()) - } -} - -pub struct MapErrFuture -where - A: Service, - F: Fn(A::Error) -> E, -{ - f: F, - fut: A::Future, -} - -impl MapErrFuture -where - A: Service, - F: Fn(A::Error) -> E, -{ - fn new(fut: A::Future, f: F) -> Self { - MapErrFuture { f, fut } - } -} - -impl Future for MapErrFuture -where - A: Service, - F: Fn(A::Error) -> E, -{ - type Item = A::Response; - type Error = E; - - fn poll(&mut self) -> Poll { - self.fut.poll().map_err(&self.f) - } -} - -/// `MapErrNewService` new service combinator -pub struct MapErrNewService { - a: A, - f: F, - e: marker::PhantomData, -} - -impl MapErrNewService -where - A: NewService, - F: Fn(A::Error) -> E, -{ - /// Create new `MapErr` new service instance - pub fn new(a: A, f: F) -> Self { - Self { - a, - f, - e: marker::PhantomData, - } - } -} - -impl Clone for MapErrNewService -where - A: NewService + Clone, - F: Fn(A::Error) -> E + Clone, -{ - fn clone(&self) -> Self { - Self { - a: self.a.clone(), - f: self.f.clone(), - e: marker::PhantomData, - } - } -} - -impl NewService for MapErrNewService -where - A: NewService + Clone, - F: Fn(A::Error) -> E + Clone, -{ - type Request = A::Request; - type Response = A::Response; - type Error = E; - type Service = MapErr; - - type InitError = A::InitError; - type Future = MapErrNewServiceFuture; - - fn new_service(&self) -> Self::Future { - MapErrNewServiceFuture::new(self.a.new_service(), self.f.clone()) - } -} - -pub struct MapErrNewServiceFuture -where - A: NewService, - F: Fn(A::Error) -> E, -{ - fut: A::Future, - f: F, -} - -impl MapErrNewServiceFuture -where - A: NewService, - F: Fn(A::Error) -> E, -{ - fn new(fut: A::Future, f: F) -> Self { - MapErrNewServiceFuture { f, fut } - } -} - -impl Future for MapErrNewServiceFuture -where - A: NewService, - F: Fn(A::Error) -> E + Clone, -{ - type Item = MapErr; - type Error = A::InitError; - - fn poll(&mut self) -> Poll { - if let Async::Ready(service) = self.fut.poll()? { - Ok(Async::Ready(MapErr::new(service, self.f.clone()))) - } else { - Ok(Async::NotReady) - } - } -} - -/// `MapInitErr` service combinator -pub struct MapInitErr { - a: A, - f: F, - e: marker::PhantomData, -} - -impl MapInitErr -where - A: NewService, - F: Fn(A::InitError) -> E, -{ - /// Create new `MapInitErr` combinator - pub fn new(a: A, f: F) -> Self { - Self { - a, - f, - e: marker::PhantomData, - } - } -} - -impl Clone for MapInitErr -where - A: NewService + Clone, - F: Fn(A::InitError) -> E + Clone, -{ - fn clone(&self) -> Self { - Self { - a: self.a.clone(), - f: self.f.clone(), - e: marker::PhantomData, - } - } -} - -impl NewService for MapInitErr -where - A: NewService, - F: Fn(A::InitError) -> E + Clone, -{ - type Request = A::Request; - type Response = A::Response; - type Error = A::Error; - type Service = A::Service; - - type InitError = E; - type Future = MapInitErrFuture; - - fn new_service(&self) -> Self::Future { - MapInitErrFuture::new(self.a.new_service(), self.f.clone()) - } -} - -pub struct MapInitErrFuture -where - A: NewService, - F: Fn(A::InitError) -> E, -{ - f: F, - fut: A::Future, -} - -impl MapInitErrFuture -where - A: NewService, - F: Fn(A::InitError) -> E, -{ - fn new(fut: A::Future, f: F) -> Self { - MapInitErrFuture { f, fut } - } -} - -impl Future for MapInitErrFuture -where - A: NewService, - F: Fn(A::InitError) -> E, -{ - type Item = A::Service; - type Error = E; - - fn poll(&mut self) -> Poll { - self.fut.poll().map_err(&self.f) - } -} diff --git a/src/service/and_then.rs b/src/service/and_then.rs new file mode 100644 index 00000000..74d247ee --- /dev/null +++ b/src/service/and_then.rs @@ -0,0 +1,215 @@ +use std::cell::RefCell; +use std::rc::Rc; + +use futures::{Async, Future, Poll}; +use tower_service::{NewService, Service}; + +use super::IntoNewService; + +/// `AndThen` service combinator +pub struct AndThen { + a: A, + b: Rc>, +} + +impl AndThen +where + A: Service, + A::Error: Into, + B: Service, +{ + /// Create new `AndThen` combinator + pub fn new(a: A, b: B) -> Self { + Self { + a, + b: Rc::new(RefCell::new(b)), + } + } +} + +impl Service for AndThen +where + A: Service, + A::Error: Into, + B: Service, +{ + type Request = A::Request; + type Response = B::Response; + type Error = B::Error; + type Future = AndThenFuture; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + match self.a.poll_ready() { + Ok(Async::Ready(_)) => self.b.borrow_mut().poll_ready(), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(err) => Err(err.into()), + } + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + AndThenFuture::new(self.a.call(req), self.b.clone()) + } +} + +pub struct AndThenFuture +where + A: Service, + A::Error: Into, + B: Service, +{ + b: Rc>, + fut_b: Option, + fut_a: A::Future, +} + +impl AndThenFuture +where + A: Service, + A::Error: Into, + B: Service, +{ + fn new(fut_a: A::Future, b: Rc>) -> Self { + AndThenFuture { + b, + fut_a, + fut_b: None, + } + } +} + +impl Future for AndThenFuture +where + A: Service, + A::Error: Into, + B: Service, +{ + type Item = B::Response; + type Error = B::Error; + + fn poll(&mut self) -> Poll { + if let Some(ref mut fut) = self.fut_b { + return fut.poll(); + } + + match self.fut_a.poll() { + Ok(Async::Ready(resp)) => { + self.fut_b = Some(self.b.borrow_mut().call(resp)); + self.poll() + } + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(err) => Err(err.into()), + } + } +} + +/// `AndThenNewService` new service combinator +pub struct AndThenNewService { + a: A, + b: B, +} + +impl AndThenNewService +where + A: NewService, + B: NewService, +{ + /// Create new `AndThen` combinator + pub fn new>(a: A, f: F) -> Self { + Self { + a, + b: f.into_new_service(), + } + } +} + +impl NewService for AndThenNewService +where + A: NewService, + A::Error: Into, + B: NewService, +{ + type Request = A::Request; + type Response = B::Response; + type Error = B::Error; + type Service = AndThen; + + type InitError = A::InitError; + type Future = AndThenNewServiceFuture; + + fn new_service(&self) -> Self::Future { + AndThenNewServiceFuture::new(self.a.new_service(), self.b.new_service()) + } +} + +impl Clone for AndThenNewService +where + A: NewService + Clone, + A::Error: Into, + B: NewService + Clone, +{ + fn clone(&self) -> Self { + Self { + a: self.a.clone(), + b: self.b.clone(), + } + } +} + +pub struct AndThenNewServiceFuture +where + A: NewService, + B: NewService, +{ + fut_b: B::Future, + fut_a: A::Future, + a: Option, + b: Option, +} + +impl AndThenNewServiceFuture +where + A: NewService, + B: NewService, +{ + fn new(fut_a: A::Future, fut_b: B::Future) -> Self { + AndThenNewServiceFuture { + fut_a, + fut_b, + a: None, + b: None, + } + } +} + +impl Future for AndThenNewServiceFuture +where + A: NewService, + A::Error: Into, + B: NewService, +{ + type Item = AndThen; + type Error = B::InitError; + + fn poll(&mut self) -> Poll { + if self.a.is_none() { + if let Async::Ready(service) = self.fut_a.poll()? { + self.a = Some(service); + } + } + + if self.b.is_none() { + if let Async::Ready(service) = self.fut_b.poll()? { + self.b = Some(service); + } + } + + if self.a.is_some() && self.b.is_some() { + Ok(Async::Ready(AndThen::new( + self.a.take().unwrap(), + self.b.take().unwrap(), + ))) + } else { + Ok(Async::NotReady) + } + } +} diff --git a/src/service/fn_service.rs b/src/service/fn_service.rs new file mode 100644 index 00000000..021c2203 --- /dev/null +++ b/src/service/fn_service.rs @@ -0,0 +1,120 @@ +use std::marker; + +use futures::{ + future::{ok, FutureResult}, + Async, IntoFuture, Poll, +}; +use tower_service::{NewService, Service}; + +use super::IntoNewService; + +pub struct FnService +where + F: Fn(Req) -> Fut, + Fut: IntoFuture, +{ + f: F, + req: marker::PhantomData, + resp: marker::PhantomData, + err: marker::PhantomData, +} + +impl FnService +where + F: Fn(Req) -> Fut, + Fut: IntoFuture, +{ + pub fn new(f: F) -> Self { + FnService { + f, + req: marker::PhantomData, + resp: marker::PhantomData, + err: marker::PhantomData, + } + } +} + +impl Service for FnService +where + F: Fn(Req) -> Fut, + Fut: IntoFuture, +{ + type Request = Req; + type Response = Resp; + type Error = E; + type Future = Fut::Future; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, req: Req) -> Self::Future { + (self.f)(req).into_future() + } +} + +pub struct FnNewService +where + F: Fn(Req) -> Fut, + Fut: IntoFuture, +{ + f: F, + req: marker::PhantomData, + resp: marker::PhantomData, + err: marker::PhantomData, + ierr: marker::PhantomData, +} + +impl FnNewService +where + F: Fn(Req) -> Fut + Clone, + Fut: IntoFuture, +{ + fn new(f: F) -> Self { + FnNewService { + f, + req: marker::PhantomData, + resp: marker::PhantomData, + err: marker::PhantomData, + ierr: marker::PhantomData, + } + } +} + +impl NewService for FnNewService +where + F: Fn(Req) -> Fut + Clone, + Fut: IntoFuture, +{ + type Request = Req; + type Response = Resp; + type Error = Err; + type Service = FnService; + type InitError = IErr; + type Future = FutureResult; + + fn new_service(&self) -> Self::Future { + ok(FnService::new(self.f.clone())) + } +} + +impl IntoNewService> + for F +where + F: Fn(Req) -> Fut + Clone + 'static, + Fut: IntoFuture, +{ + fn into_new_service(self) -> FnNewService { + FnNewService::new(self) + } +} + +impl Clone for FnNewService +where + F: Fn(Req) -> Fut + Clone, + Fut: IntoFuture, +{ + fn clone(&self) -> Self { + Self::new(self.f.clone()) + } +} diff --git a/src/service/fn_state_service.rs b/src/service/fn_state_service.rs new file mode 100644 index 00000000..4a76ee36 --- /dev/null +++ b/src/service/fn_state_service.rs @@ -0,0 +1,147 @@ +use std::marker; + +use futures::{Async, Future, IntoFuture, Poll}; +use tower_service::{NewService, Service}; + +use super::IntoNewService; + +pub struct FnStateService +where + F: Fn(&mut S, Req) -> Fut, + Fut: IntoFuture, +{ + f: F, + state: S, + req: marker::PhantomData, + resp: marker::PhantomData, + err: marker::PhantomData, +} + +impl FnStateService +where + F: Fn(&mut S, Req) -> Fut, + Fut: IntoFuture, +{ + pub fn new(state: S, f: F) -> Self { + FnStateService { + f, + state, + req: marker::PhantomData, + resp: marker::PhantomData, + err: marker::PhantomData, + } + } +} + +impl Service for FnStateService +where + F: Fn(&mut S, Req) -> Fut, + Fut: IntoFuture, +{ + type Request = Req; + type Response = Resp; + type Error = Err; + type Future = Fut::Future; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, req: Req) -> Self::Future { + (self.f)(&mut self.state, req).into_future() + } +} + +/// `NewService` for state and handler functions +pub struct FnStateNewService { + f: F1, + state: F2, + s: marker::PhantomData, + req: marker::PhantomData, + resp: marker::PhantomData, + err1: marker::PhantomData, + err2: marker::PhantomData, + fut1: marker::PhantomData, + fut2: marker::PhantomData, +} + +impl + FnStateNewService +{ + fn new(f: F1, state: F2) -> Self { + FnStateNewService { + f, + state, + s: marker::PhantomData, + req: marker::PhantomData, + resp: marker::PhantomData, + err1: marker::PhantomData, + err2: marker::PhantomData, + fut1: marker::PhantomData, + fut2: marker::PhantomData, + } + } +} + +impl NewService + for FnStateNewService +where + S: 'static, + F1: Fn(&mut S, Req) -> Fut1 + Clone + 'static, + F2: Fn() -> Fut2, + Fut1: IntoFuture + 'static, + Fut2: IntoFuture + 'static, + Req: 'static, + Resp: 'static, + Err1: 'static, + Err2: 'static, +{ + type Request = Req; + type Response = Resp; + type Error = Err1; + type Service = FnStateService; + type InitError = Err2; + type Future = Box>; + + fn new_service(&self) -> Self::Future { + let f = self.f.clone(); + Box::new( + (self.state)() + .into_future() + .and_then(move |state| Ok(FnStateService::new(state, f))), + ) + } +} + +impl + IntoNewService> for (F1, F2) +where + S: 'static, + F1: Fn(&mut S, Req) -> Fut1 + Clone + 'static, + F2: Fn() -> Fut2, + Fut1: IntoFuture + 'static, + Fut2: IntoFuture + 'static, + Req: 'static, + Resp: 'static, + Err1: 'static, + Err2: 'static, +{ + fn into_new_service( + self, + ) -> FnStateNewService { + FnStateNewService::new(self.0, self.1) + } +} + +impl Clone + for FnStateNewService +where + F1: Fn(&mut S, Req) -> Fut1 + Clone + 'static, + F2: Fn() -> Fut2 + Clone, + Fut1: IntoFuture, + Fut2: IntoFuture, +{ + fn clone(&self) -> Self { + Self::new(self.f.clone(), self.state.clone()) + } +} diff --git a/src/service/map_err.rs b/src/service/map_err.rs new file mode 100644 index 00000000..a4bcf1e1 --- /dev/null +++ b/src/service/map_err.rs @@ -0,0 +1,168 @@ +use std::marker; + +use futures::{Async, Future, Poll}; +use tower_service::{NewService, Service}; + +/// `MapErr` service combinator +pub struct MapErr { + a: A, + f: F, + e: marker::PhantomData, +} + +impl MapErr +where + A: Service, + F: Fn(A::Error) -> E, +{ + /// Create new `MapErr` combinator + pub fn new(a: A, f: F) -> Self { + Self { + a, + f, + e: marker::PhantomData, + } + } +} + +impl Service for MapErr +where + A: Service, + F: Fn(A::Error) -> E, + F: Clone, +{ + type Request = A::Request; + type Response = A::Response; + type Error = E; + type Future = MapErrFuture; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.a.poll_ready().map_err(&self.f) + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + MapErrFuture::new(self.a.call(req), self.f.clone()) + } +} + +pub struct MapErrFuture +where + A: Service, + F: Fn(A::Error) -> E, +{ + f: F, + fut: A::Future, +} + +impl MapErrFuture +where + A: Service, + F: Fn(A::Error) -> E, +{ + fn new(fut: A::Future, f: F) -> Self { + MapErrFuture { f, fut } + } +} + +impl Future for MapErrFuture +where + A: Service, + F: Fn(A::Error) -> E, +{ + type Item = A::Response; + type Error = E; + + fn poll(&mut self) -> Poll { + self.fut.poll().map_err(&self.f) + } +} + +/// `MapErrNewService` new service combinator +pub struct MapErrNewService { + a: A, + f: F, + e: marker::PhantomData, +} + +impl MapErrNewService +where + A: NewService, + F: Fn(A::Error) -> E, +{ + /// Create new `MapErr` new service instance + pub fn new(a: A, f: F) -> Self { + Self { + a, + f, + e: marker::PhantomData, + } + } +} + +impl Clone for MapErrNewService +where + A: NewService + Clone, + F: Fn(A::Error) -> E + Clone, +{ + fn clone(&self) -> Self { + Self { + a: self.a.clone(), + f: self.f.clone(), + e: marker::PhantomData, + } + } +} + +impl NewService for MapErrNewService +where + A: NewService + Clone, + F: Fn(A::Error) -> E + Clone, +{ + type Request = A::Request; + type Response = A::Response; + type Error = E; + type Service = MapErr; + + type InitError = A::InitError; + type Future = MapErrNewServiceFuture; + + fn new_service(&self) -> Self::Future { + MapErrNewServiceFuture::new(self.a.new_service(), self.f.clone()) + } +} + +pub struct MapErrNewServiceFuture +where + A: NewService, + F: Fn(A::Error) -> E, +{ + fut: A::Future, + f: F, +} + +impl MapErrNewServiceFuture +where + A: NewService, + F: Fn(A::Error) -> E, +{ + fn new(fut: A::Future, f: F) -> Self { + MapErrNewServiceFuture { f, fut } + } +} + +impl Future for MapErrNewServiceFuture +where + A: NewService, + F: Fn(A::Error) -> E + Clone, +{ + type Item = MapErr; + type Error = A::InitError; + + fn poll(&mut self) -> Poll { + if let Async::Ready(service) = self.fut.poll()? { + Ok(Async::Ready(MapErr::new(service, self.f.clone()))) + } else { + Ok(Async::NotReady) + } + } +} diff --git a/src/service/map_init_err.rs b/src/service/map_init_err.rs new file mode 100644 index 00000000..1b5d1e1c --- /dev/null +++ b/src/service/map_init_err.rs @@ -0,0 +1,90 @@ +use std::marker; + +use futures::{Future, Poll}; +use tower_service::NewService; + +/// `MapInitErr` service combinator +pub struct MapInitErr { + a: A, + f: F, + e: marker::PhantomData, +} + +impl MapInitErr +where + A: NewService, + F: Fn(A::InitError) -> E, +{ + /// Create new `MapInitErr` combinator + pub fn new(a: A, f: F) -> Self { + Self { + a, + f, + e: marker::PhantomData, + } + } +} + +impl Clone for MapInitErr +where + A: NewService + Clone, + F: Fn(A::InitError) -> E + Clone, +{ + fn clone(&self) -> Self { + Self { + a: self.a.clone(), + f: self.f.clone(), + e: marker::PhantomData, + } + } +} + +impl NewService for MapInitErr +where + A: NewService, + F: Fn(A::InitError) -> E + Clone, +{ + type Request = A::Request; + type Response = A::Response; + type Error = A::Error; + type Service = A::Service; + + type InitError = E; + type Future = MapInitErrFuture; + + fn new_service(&self) -> Self::Future { + MapInitErrFuture::new(self.a.new_service(), self.f.clone()) + } +} + +pub struct MapInitErrFuture +where + A: NewService, + F: Fn(A::InitError) -> E, +{ + f: F, + fut: A::Future, +} + +impl MapInitErrFuture +where + A: NewService, + F: Fn(A::InitError) -> E, +{ + fn new(fut: A::Future, f: F) -> Self { + MapInitErrFuture { f, fut } + } +} + +impl Future for MapInitErrFuture +where + A: NewService, + F: Fn(A::InitError) -> E, +{ + type Item = A::Service; + type Error = E; + + fn poll(&mut self) -> Poll { + self.fut.poll().map_err(&self.f) + } +} diff --git a/src/service/mod.rs b/src/service/mod.rs new file mode 100644 index 00000000..e5d1d537 --- /dev/null +++ b/src/service/mod.rs @@ -0,0 +1,93 @@ +use futures::IntoFuture; +use tower_service::{NewService, Service}; + +mod and_then; +mod fn_service; +mod fn_state_service; +mod map_err; +mod map_init_err; + +pub use self::and_then::{AndThen, AndThenNewService}; +pub use self::fn_service::FnService; +pub use self::fn_state_service::FnStateService; +pub use self::map_err::{MapErr, MapErrNewService}; +pub use self::map_init_err::MapInitErr; + +pub trait NewServiceExt: NewService { + fn and_then(self, new_service: F) -> AndThenNewService + where + Self: Sized, + F: IntoNewService, + B: NewService< + Request = Self::Response, + Error = Self::Error, + InitError = Self::InitError, + >, + { + AndThenNewService::new(self, new_service) + } + + fn map_err(self, f: F) -> MapErrNewService + where + Self: Sized, + F: Fn(Self::Error) -> E, + { + MapErrNewService::new(self, f) + } + + fn map_init_err(self, f: F) -> MapInitErr + where + Self: Sized, + F: Fn(Self::InitError) -> E, + { + MapInitErr::new(self, f) + } +} + +impl NewServiceExt for T {} + +/// Trait for types that can be converted to a Service +pub trait IntoService +where + T: Service, +{ + /// Create service + fn into(self) -> T; +} + +/// Trait for types that can be converted to a Service +pub trait IntoNewService +where + T: NewService, +{ + /// Create service + fn into_new_service(self) -> T; +} + +impl IntoService for T +where + T: Service, +{ + fn into(self) -> T { + self + } +} + +impl IntoNewService for T +where + T: NewService, +{ + fn into_new_service(self) -> T { + self + } +} + +impl IntoService> for F +where + F: Fn(Req) -> Fut + 'static, + Fut: IntoFuture, +{ + fn into(self) -> FnService { + FnService::new(self) + } +} diff --git a/src/ssl/mod.rs b/src/ssl/mod.rs index 2fb85e16..5b900273 100644 --- a/src/ssl/mod.rs +++ b/src/ssl/mod.rs @@ -2,7 +2,7 @@ #[cfg(feature = "ssl")] mod openssl; #[cfg(feature = "ssl")] -pub use self::openssl::OpensslService; +pub use self::openssl::{OpensslAcceptor, OpensslConnector}; // #[cfg(feature = "tls")] // mod nativetls; diff --git a/src/ssl/openssl.rs b/src/ssl/openssl.rs index f4a33950..606e57bf 100644 --- a/src/ssl/openssl.rs +++ b/src/ssl/openssl.rs @@ -1,26 +1,25 @@ -use std::marker::PhantomData; -// use std::net::Shutdown; use std::io; +use std::marker::PhantomData; -use futures::{future, future::FutureResult, Async, Future, Poll}; -use openssl::ssl::{AlpnError, SslAcceptor, SslAcceptorBuilder}; +use futures::{future, future::FutureResult, Async, Poll}; +use openssl::ssl::{AlpnError, Error, SslAcceptor, SslAcceptorBuilder, SslConnector}; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_openssl::{AcceptAsync, SslAcceptorExt, SslStream}; +use tokio_openssl::{AcceptAsync, ConnectAsync, SslAcceptorExt, SslConnectorExt, SslStream}; use {NewService, Service}; /// Support `SSL` connections via openssl package /// -/// `alpn` feature enables `OpensslAcceptor` type -pub struct OpensslService { +/// `ssl` feature enables `OpensslAcceptor` type +pub struct OpensslAcceptor { acceptor: SslAcceptor, io: PhantomData, } -impl OpensslService { - /// Create default `OpensslService` +impl OpensslAcceptor { + /// Create default `OpensslAcceptor` pub fn new(builder: SslAcceptorBuilder) -> Self { - OpensslService { + OpensslAcceptor { acceptor: builder.build(), io: PhantomData, } @@ -40,13 +39,13 @@ impl OpensslService { }); builder.set_alpn_protos(&protos[..])?; - Ok(OpensslService { + Ok(OpensslAcceptor { acceptor: builder.build(), io: PhantomData, }) } } -impl Clone for OpensslService { +impl Clone for OpensslAcceptor { fn clone(&self) -> Self { Self { acceptor: self.acceptor.clone(), @@ -55,69 +54,102 @@ impl Clone for OpensslService { } } -impl NewService for OpensslService { +impl NewService for OpensslAcceptor { type Request = T; type Response = SslStream; - type Error = io::Error; - type Service = OpensslAcceptor; + type Error = Error; + type Service = OpensslAcceptorService; type InitError = io::Error; type Future = FutureResult; fn new_service(&self) -> Self::Future { - future::ok(OpensslAcceptor { + future::ok(OpensslAcceptorService { acceptor: self.acceptor.clone(), io: PhantomData, }) } } -pub struct OpensslAcceptor { +pub struct OpensslAcceptorService { acceptor: SslAcceptor, io: PhantomData, } -impl Service for OpensslAcceptor { +impl Service for OpensslAcceptorService { type Request = T; type Response = SslStream; - type Error = io::Error; - type Future = AcceptorFuture; + type Error = Error; + type Future = AcceptAsync; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } fn call(&mut self, req: Self::Request) -> Self::Future { - AcceptorFuture(SslAcceptorExt::accept_async(&self.acceptor, req)) + SslAcceptorExt::accept_async(&self.acceptor, req) } } -pub struct AcceptorFuture(AcceptAsync); +/// Openssl connector factory +pub struct OpensslConnector { + connector: SslConnector, + io: PhantomData, +} -impl Future for AcceptorFuture { - type Item = SslStream; - type Error = io::Error; - - fn poll(&mut self) -> Poll { - self.0 - .poll() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) +impl OpensslConnector { + pub fn new(connector: SslConnector) -> Self { + OpensslConnector { + connector, + io: PhantomData, + } } } -// impl IoStream for SslStream { -// #[inline] -// fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> { -// let _ = self.get_mut().shutdown(); -// Ok(()) -// } +impl Clone for OpensslConnector { + fn clone(&self) -> Self { + Self { + connector: self.connector.clone(), + io: PhantomData, + } + } +} -// #[inline] -// fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> { -// self.get_mut().get_mut().set_nodelay(nodelay) -// } +impl NewService for OpensslConnector { + type Request = T; + type Response = SslStream; + type Error = Error; + type Service = OpensslConnectorService; + type InitError = io::Error; + type Future = FutureResult; -// #[inline] -// fn set_linger(&mut self, dur: Option) -> io::Result<()> { -// self.get_mut().get_mut().set_linger(dur) -// } -// } + fn new_service(&self) -> Self::Future { + future::ok(OpensslConnectorService { + connector: self.connector.clone(), + io: PhantomData, + }) + } +} + +pub trait OpensslDomain { + fn domain(&self) -> &str; +} + +pub struct OpensslConnectorService { + connector: SslConnector, + io: PhantomData, +} + +impl Service for OpensslConnectorService { + type Request = T; + type Response = SslStream; + type Error = Error; + type Future = ConnectAsync; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + SslConnectorExt::connect_async(&self.connector, "", req) + } +}