1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-27 19:12:56 +01:00

revert generic request in actix-utils

This commit is contained in:
Nikolay Kim 2019-03-09 07:27:35 -08:00
parent 6bbbdba921
commit ac62e2dbf9
9 changed files with 173 additions and 119 deletions

View File

@ -13,9 +13,9 @@ pub struct CloneableService<T: 'static> {
} }
impl<T: 'static> CloneableService<T> { impl<T: 'static> CloneableService<T> {
pub fn new<R>(service: T) -> Self pub fn new(service: T) -> Self
where where
T: Service<R>, T: Service,
{ {
Self { Self {
service: Cell::new(service), service: Cell::new(service),
@ -33,10 +33,11 @@ impl<T: 'static> Clone for CloneableService<T> {
} }
} }
impl<T, R> Service<R> for CloneableService<T> impl<T> Service for CloneableService<T>
where where
T: Service<R> + 'static, T: Service + 'static,
{ {
type Request = T::Request;
type Response = T::Response; type Response = T::Response;
type Error = T::Error; type Error = T::Error;
type Future = T::Future; type Future = T::Future;
@ -45,7 +46,7 @@ where
self.service.get_mut().poll_ready() self.service.get_mut().poll_ready()
} }
fn call(&mut self, req: R) -> Self::Future { fn call(&mut self, req: T::Request) -> Self::Future {
self.service.get_mut().call(req) self.service.get_mut().call(req)
} }
} }

View File

@ -21,11 +21,12 @@ impl<A: Clone, B: Clone> Clone for EitherService<A, B> {
} }
} }
impl<A, B, R> Service<R> for EitherService<A, B> impl<A, B> Service for EitherService<A, B>
where where
A: Service<R>, A: Service,
B: Service<R, Response = A::Response, Error = A::Error>, B: Service<Request = A::Request, Response = A::Response, Error = A::Error>,
{ {
type Request = A::Request;
type Response = A::Response; type Response = A::Response;
type Error = A::Error; type Error = A::Error;
type Future = future::Either<A::Future, B::Future>; type Future = future::Either<A::Future, B::Future>;
@ -37,7 +38,7 @@ where
} }
} }
fn call(&mut self, req: R) -> Self::Future { fn call(&mut self, req: A::Request) -> Self::Future {
match self { match self {
EitherService::A(ref mut inner) => future::Either::A(inner.call(req)), EitherService::A(ref mut inner) => future::Either::A(inner.call(req)),
EitherService::B(ref mut inner) => future::Either::B(inner.call(req)), EitherService::B(ref mut inner) => future::Either::B(inner.call(req)),
@ -52,33 +53,52 @@ pub enum Either<A, B> {
} }
impl<A, B> Either<A, B> { impl<A, B> Either<A, B> {
pub fn new_a<R, C>(srv: A) -> Self pub fn new_a<C>(srv: A) -> Self
where where
A: NewService<R, C>, A: NewService<C>,
B: NewService<R, C, Response = A::Response, Error = A::Error, InitError = A::InitError>, B: NewService<
C,
Request = A::Request,
Response = A::Response,
Error = A::Error,
InitError = A::InitError,
>,
{ {
Either::A(srv) Either::A(srv)
} }
pub fn new_b<R, C>(srv: B) -> Self pub fn new_b<C>(srv: B) -> Self
where where
A: NewService<R, C>, A: NewService<C>,
B: NewService<R, C, Response = A::Response, Error = A::Error, InitError = A::InitError>, B: NewService<
C,
Request = A::Request,
Response = A::Response,
Error = A::Error,
InitError = A::InitError,
>,
{ {
Either::B(srv) Either::B(srv)
} }
} }
impl<A, B, R, C> NewService<R, C> for Either<A, B> impl<A, B, C> NewService<C> for Either<A, B>
where where
A: NewService<R, C>, A: NewService<C>,
B: NewService<R, C, Response = A::Response, Error = A::Error, InitError = A::InitError>, B: NewService<
C,
Request = A::Request,
Response = A::Response,
Error = A::Error,
InitError = A::InitError,
>,
{ {
type Request = A::Request;
type Response = A::Response; type Response = A::Response;
type Error = A::Error; type Error = A::Error;
type InitError = A::InitError; type InitError = A::InitError;
type Service = EitherService<A::Service, B::Service>; type Service = EitherService<A::Service, B::Service>;
type Future = EitherNewService<A, B, R, C>; type Future = EitherNewService<A, B, C>;
fn new_service(&self, cfg: &C) -> Self::Future { fn new_service(&self, cfg: &C) -> Self::Future {
match self { match self {
@ -98,15 +118,21 @@ impl<A: Clone, B: Clone> Clone for Either<A, B> {
} }
#[doc(hidden)] #[doc(hidden)]
pub enum EitherNewService<A: NewService<R, C>, B: NewService<R, C>, R, C> { pub enum EitherNewService<A: NewService<C>, B: NewService<C>, C> {
A(<A::Future as IntoFuture>::Future), A(<A::Future as IntoFuture>::Future),
B(<B::Future as IntoFuture>::Future), B(<B::Future as IntoFuture>::Future),
} }
impl<A, B, R, C> Future for EitherNewService<A, B, R, C> impl<A, B, C> Future for EitherNewService<A, B, C>
where where
A: NewService<R, C>, A: NewService<C>,
B: NewService<R, C, Response = A::Response, Error = A::Error, InitError = A::InitError>, B: NewService<
C,
Request = A::Request,
Response = A::Response,
Error = A::Error,
InitError = A::InitError,
>,
{ {
type Item = EitherService<A::Service, B::Service>; type Item = EitherService<A::Service, B::Service>;
type Error = A::InitError; type Error = A::InitError;

View File

@ -23,15 +23,15 @@ pub struct FramedNewService<S, T, U, C> {
impl<S, T, U, C> FramedNewService<S, T, U, C> impl<S, T, U, C> FramedNewService<S, T, U, C>
where where
C: Clone, C: Clone,
S: NewService<Request<U>, C, Response = Response<U>>, S: NewService<C, Request = Request<U>, Response = Response<U>>,
S::Error: 'static, S::Error: 'static,
<S::Service as Service<Request<U>>>::Future: 'static, <S::Service as Service>::Future: 'static,
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
U: Decoder + Encoder, U: Decoder + Encoder,
<U as Encoder>::Item: 'static, <U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug, <U as Encoder>::Error: std::fmt::Debug,
{ {
pub fn new<F1: IntoNewService<S, Request<U>, C>>(factory: F1) -> Self { pub fn new<F1: IntoNewService<S, C>>(factory: F1) -> Self {
Self { Self {
factory: factory.into_new_service(), factory: factory.into_new_service(),
_t: PhantomData, _t: PhantomData,
@ -51,17 +51,18 @@ where
} }
} }
impl<S, T, U, C> NewService<Framed<T, U>, C> for FramedNewService<S, T, U, C> impl<S, T, U, C> NewService<C> for FramedNewService<S, T, U, C>
where where
C: Clone, C: Clone,
S: NewService<Request<U>, C, Response = Response<U>> + Clone, S: NewService<C, Request = Request<U>, Response = Response<U>> + Clone,
S::Error: 'static, S::Error: 'static,
<S::Service as Service<Request<U>>>::Future: 'static, <S::Service as Service>::Future: 'static,
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
U: Decoder + Encoder, U: Decoder + Encoder,
<U as Encoder>::Item: 'static, <U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug, <U as Encoder>::Error: std::fmt::Debug,
{ {
type Request = Framed<T, U>;
type Response = FramedTransport<S::Service, T, U>; type Response = FramedTransport<S::Service, T, U>;
type Error = S::InitError; type Error = S::InitError;
type InitError = S::InitError; type InitError = S::InitError;
@ -97,17 +98,18 @@ where
} }
} }
impl<S, T, U, C> Service<Framed<T, U>> for FramedService<S, T, U, C> impl<S, T, U, C> Service for FramedService<S, T, U, C>
where where
S: NewService<Request<U>, C, Response = Response<U>>, S: NewService<C, Request = Request<U>, Response = Response<U>>,
S::Error: 'static, S::Error: 'static,
<S::Service as Service<Request<U>>>::Future: 'static, <S::Service as Service>::Future: 'static,
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
U: Decoder + Encoder, U: Decoder + Encoder,
<U as Encoder>::Item: 'static, <U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug, <U as Encoder>::Error: std::fmt::Debug,
C: Clone, C: Clone,
{ {
type Request = Framed<T, U>;
type Response = FramedTransport<S::Service, T, U>; type Response = FramedTransport<S::Service, T, U>;
type Error = S::InitError; type Error = S::InitError;
type Future = FramedServiceResponseFuture<S, T, U, C>; type Future = FramedServiceResponseFuture<S, T, U, C>;
@ -127,9 +129,9 @@ where
#[doc(hidden)] #[doc(hidden)]
pub struct FramedServiceResponseFuture<S, T, U, C> pub struct FramedServiceResponseFuture<S, T, U, C>
where where
S: NewService<Request<U>, C, Response = Response<U>>, S: NewService<C, Request = Request<U>, Response = Response<U>>,
S::Error: 'static, S::Error: 'static,
<S::Service as Service<Request<U>>>::Future: 'static, <S::Service as Service>::Future: 'static,
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
U: Decoder + Encoder, U: Decoder + Encoder,
<U as Encoder>::Item: 'static, <U as Encoder>::Item: 'static,
@ -141,9 +143,9 @@ where
impl<S, T, U, C> Future for FramedServiceResponseFuture<S, T, U, C> impl<S, T, U, C> Future for FramedServiceResponseFuture<S, T, U, C>
where where
S: NewService<Request<U>, C, Response = Response<U>>, S: NewService<C, Request = Request<U>, Response = Response<U>>,
S::Error: 'static, S::Error: 'static,
<S::Service as Service<Request<U>>>::Future: 'static, <S::Service as Service>::Future: 'static,
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
U: Decoder + Encoder, U: Decoder + Encoder,
<U as Encoder>::Item: 'static, <U as Encoder>::Item: 'static,
@ -180,7 +182,7 @@ impl<E, U: Encoder + Decoder> From<E> for FramedTransportError<E, U> {
/// and pass then to the service. /// and pass then to the service.
pub struct FramedTransport<S, T, U> pub struct FramedTransport<S, T, U>
where where
S: Service<Request<U>, Response = Response<U>>, S: Service<Request = Request<U>, Response = Response<U>>,
S::Error: 'static, S::Error: 'static,
S::Future: 'static, S::Future: 'static,
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
@ -194,7 +196,7 @@ where
inner: Cell<FramedTransportInner<<U as Encoder>::Item, S::Error>>, inner: Cell<FramedTransportInner<<U as Encoder>::Item, S::Error>>,
} }
enum TransportState<S: Service<Request<U>>, U: Encoder + Decoder> { enum TransportState<S: Service, U: Encoder + Decoder> {
Processing, Processing,
Error(FramedTransportError<S::Error, U>), Error(FramedTransportError<S::Error, U>),
FramedError(FramedTransportError<S::Error, U>), FramedError(FramedTransportError<S::Error, U>),
@ -208,7 +210,7 @@ struct FramedTransportInner<I, E> {
impl<S, T, U> FramedTransport<S, T, U> impl<S, T, U> FramedTransport<S, T, U>
where where
S: Service<Request<U>, Response = Response<U>>, S: Service<Request = Request<U>, Response = Response<U>>,
S::Error: 'static, S::Error: 'static,
S::Future: 'static, S::Future: 'static,
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
@ -300,7 +302,7 @@ where
impl<S, T, U> FramedTransport<S, T, U> impl<S, T, U> FramedTransport<S, T, U>
where where
S: Service<Request<U>, Response = Response<U>>, S: Service<Request = Request<U>, Response = Response<U>>,
S::Error: 'static, S::Error: 'static,
S::Future: 'static, S::Future: 'static,
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
@ -308,7 +310,7 @@ where
<U as Encoder>::Item: 'static, <U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug, <U as Encoder>::Error: std::fmt::Debug,
{ {
pub fn new<F: IntoService<S, Request<U>>>(framed: Framed<T, U>, service: F) -> Self { pub fn new<F: IntoService<S>>(framed: Framed<T, U>, service: F) -> Self {
FramedTransport { FramedTransport {
framed, framed,
service: service.into_service(), service: service.into_service(),
@ -346,7 +348,7 @@ where
impl<S, T, U> Future for FramedTransport<S, T, U> impl<S, T, U> Future for FramedTransport<S, T, U>
where where
S: Service<Request<U>, Response = Response<U>>, S: Service<Request = Request<U>, Response = Response<U>>,
S::Error: 'static, S::Error: 'static,
S::Future: 'static, S::Future: 'static,
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
@ -406,19 +408,20 @@ where
} }
} }
impl<T, U, F> NewService<T, ()> for IntoFramed<T, U, F> impl<T, C, U, F> NewService<C> for IntoFramed<T, U, F>
where where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
F: Fn() -> U + Send + Clone + 'static, F: Fn() -> U + Send + Clone + 'static,
U: Encoder + Decoder, U: Encoder + Decoder,
{ {
type Request = T;
type Response = Framed<T, U>; type Response = Framed<T, U>;
type Error = (); type Error = ();
type InitError = (); type InitError = ();
type Service = IntoFramedService<T, U, F>; type Service = IntoFramedService<T, U, F>;
type Future = FutureResult<Self::Service, Self::InitError>; type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future { fn new_service(&self, _: &C) -> Self::Future {
ok(IntoFramedService { ok(IntoFramedService {
factory: self.factory.clone(), factory: self.factory.clone(),
_t: PhantomData, _t: PhantomData,
@ -436,12 +439,13 @@ where
_t: PhantomData<(T,)>, _t: PhantomData<(T,)>,
} }
impl<T, U, F> Service<T> for IntoFramedService<T, U, F> impl<T, U, F> Service for IntoFramedService<T, U, F>
where where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
F: Fn() -> U + Send + Clone + 'static, F: Fn() -> U + Send + Clone + 'static,
U: Encoder + Decoder, U: Encoder + Decoder,
{ {
type Request = T;
type Response = Framed<T, U>; type Response = Framed<T, U>;
type Error = (); type Error = ();
type Future = FutureResult<Self::Response, Self::Error>; type Future = FutureResult<Self::Response, Self::Error>;

View File

@ -24,7 +24,8 @@ impl Default for InFlight {
} }
} }
impl<S: Service<R>, R> Transform<S, R> for InFlight { impl<S: Service> Transform<S> for InFlight {
type Request = S::Request;
type Response = S::Response; type Response = S::Response;
type Error = S::Error; type Error = S::Error;
type InitError = Void; type InitError = Void;
@ -50,13 +51,14 @@ impl<S> InFlightService<S> {
} }
} }
impl<T, R> Service<R> for InFlightService<T> impl<T> Service for InFlightService<T>
where where
T: Service<R>, T: Service,
{ {
type Request = T::Request;
type Response = T::Response; type Response = T::Response;
type Error = T::Error; type Error = T::Error;
type Future = InFlightServiceResponse<T, R>; type Future = InFlightServiceResponse<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready()?; self.service.poll_ready()?;
@ -69,7 +71,7 @@ where
} }
} }
fn call(&mut self, req: R) -> Self::Future { fn call(&mut self, req: T::Request) -> Self::Future {
InFlightServiceResponse { InFlightServiceResponse {
fut: self.service.call(req), fut: self.service.call(req),
_guard: self.count.get(), _guard: self.count.get(),
@ -78,12 +80,12 @@ where
} }
#[doc(hidden)] #[doc(hidden)]
pub struct InFlightServiceResponse<T: Service<R>, R> { pub struct InFlightServiceResponse<T: Service> {
fut: T::Future, fut: T::Future,
_guard: CounterGuard, _guard: CounterGuard,
} }
impl<T: Service<R>, R> Future for InFlightServiceResponse<T, R> { impl<T: Service> Future for InFlightServiceResponse<T> {
type Item = T::Response; type Item = T::Response;
type Error = T::Error; type Error = T::Error;
@ -105,7 +107,8 @@ mod tests {
struct SleepService(Duration); struct SleepService(Duration);
impl Service<()> for SleepService { impl Service for SleepService {
type Request = ();
type Response = (); type Response = ();
type Error = (); type Error = ();
type Future = Box<Future<Item = (), Error = ()>>; type Future = Box<Future<Item = (), Error = ()>>;

View File

@ -43,10 +43,11 @@ where
} }
} }
impl<R, E, F> NewService<R, ()> for KeepAlive<R, E, F> impl<R, E, F> NewService<()> for KeepAlive<R, E, F>
where where
F: Fn() -> E + Clone, F: Fn() -> E + Clone,
{ {
type Request = R;
type Response = R; type Response = R;
type Error = E; type Error = E;
type InitError = Void; type InitError = Void;
@ -88,10 +89,11 @@ where
} }
} }
impl<R, E, F> Service<R> for KeepAliveService<R, E, F> impl<R, E, F> Service for KeepAliveService<R, E, F>
where where
F: Fn() -> E, F: Fn() -> E,
{ {
type Request = R;
type Response = R; type Response = R;
type Error = E; type Error = E;
type Future = FutureResult<R, E>; type Future = FutureResult<R, E>;

View File

@ -52,39 +52,46 @@ pub struct InOrder<S> {
_t: PhantomData<S>, _t: PhantomData<S>,
} }
impl<S> InOrder<S> { impl<S> InOrder<S>
pub fn new<R>() -> Self
where
S: Service<R>,
S::Response: 'static,
S::Future: 'static,
S::Error: 'static,
{
Self { _t: PhantomData }
}
pub fn service<R>(service: S) -> InOrderService<S, R>
where
S: Service<R>,
S::Response: 'static,
S::Future: 'static,
S::Error: 'static,
{
InOrderService::new(service)
}
}
impl<S, R> Transform<S, R> for InOrder<S>
where where
S: Service<R>, S: Service,
S::Response: 'static, S::Response: 'static,
S::Future: 'static, S::Future: 'static,
S::Error: 'static, S::Error: 'static,
{ {
pub fn new() -> Self {
Self { _t: PhantomData }
}
pub fn service(service: S) -> InOrderService<S> {
InOrderService::new(service)
}
}
impl<S> Default for InOrder<S>
where
S: Service,
S::Response: 'static,
S::Future: 'static,
S::Error: 'static,
{
fn default() -> Self {
Self::new()
}
}
impl<S> Transform<S> for InOrder<S>
where
S: Service,
S::Response: 'static,
S::Future: 'static,
S::Error: 'static,
{
type Request = S::Request;
type Response = S::Response; type Response = S::Response;
type Error = InOrderError<S::Error>; type Error = InOrderError<S::Error>;
type InitError = Void; type InitError = Void;
type Transform = InOrderService<S, R>; type Transform = InOrderService<S>;
type Future = FutureResult<Self::Transform, Self::InitError>; type Future = FutureResult<Self::Transform, Self::InitError>;
fn new_transform(&self, service: S) -> Self::Future { fn new_transform(&self, service: S) -> Self::Future {
@ -92,15 +99,15 @@ where
} }
} }
pub struct InOrderService<S: Service<R>, R> { pub struct InOrderService<S: Service> {
service: S, service: S,
task: Rc<AtomicTask>, task: Rc<AtomicTask>,
acks: VecDeque<Record<S::Response, S::Error>>, acks: VecDeque<Record<S::Response, S::Error>>,
} }
impl<S, R> InOrderService<S, R> impl<S> InOrderService<S>
where where
S: Service<R>, S: Service,
S::Response: 'static, S::Response: 'static,
S::Future: 'static, S::Future: 'static,
S::Error: 'static, S::Error: 'static,
@ -114,16 +121,17 @@ where
} }
} }
impl<S, R> Service<R> for InOrderService<S, R> impl<S> Service for InOrderService<S>
where where
S: Service<R>, S: Service,
S::Response: 'static, S::Response: 'static,
S::Future: 'static, S::Future: 'static,
S::Error: 'static, S::Error: 'static,
{ {
type Request = S::Request;
type Response = S::Response; type Response = S::Response;
type Error = InOrderError<S::Error>; type Error = InOrderError<S::Error>;
type Future = InOrderServiceResponse<S, R>; type Future = InOrderServiceResponse<S>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
// poll_ready could be called from different task // poll_ready could be called from different task
@ -148,7 +156,7 @@ where
Ok(Async::Ready(())) Ok(Async::Ready(()))
} }
fn call(&mut self, request: R) -> Self::Future { fn call(&mut self, request: S::Request) -> Self::Future {
let (tx1, rx1) = oneshot::channel(); let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel(); let (tx2, rx2) = oneshot::channel();
self.acks.push_back(Record { rx: rx1, tx: tx2 }); self.acks.push_back(Record { rx: rx1, tx: tx2 });
@ -165,11 +173,11 @@ where
} }
#[doc(hidden)] #[doc(hidden)]
pub struct InOrderServiceResponse<S: Service<R>, R> { pub struct InOrderServiceResponse<S: Service> {
rx: oneshot::Receiver<Result<S::Response, S::Error>>, rx: oneshot::Receiver<Result<S::Response, S::Error>>,
} }
impl<S: Service<R>, R> Future for InOrderServiceResponse<S, R> { impl<S: Service> Future for InOrderServiceResponse<S> {
type Item = S::Response; type Item = S::Response;
type Error = InOrderError<S::Error>; type Error = InOrderError<S::Error>;
@ -196,7 +204,8 @@ mod tests {
struct Srv; struct Srv;
impl Service<oneshot::Receiver<usize>> for Srv { impl Service for Srv {
type Request = oneshot::Receiver<usize>;
type Response = usize; type Response = usize;
type Error = (); type Error = ();
type Future = Box<Future<Item = usize, Error = ()>>; type Future = Box<Future<Item = usize, Error = ()>>;
@ -210,11 +219,11 @@ mod tests {
} }
} }
struct SrvPoll<S: Service<oneshot::Receiver<usize>>> { struct SrvPoll<S: Service> {
s: S, s: S,
} }
impl<S: Service<oneshot::Receiver<usize>>> Future for SrvPoll<S> { impl<S: Service> Future for SrvPoll<S> {
type Item = (); type Item = ();
type Error = (); type Error = ();

View File

@ -38,12 +38,12 @@ impl<S, T, E, C> StreamNewService<S, T, E, C>
where where
C: Clone, C: Clone,
S: IntoStream, S: IntoStream,
T: NewService<Request<S>, C, Response = (), Error = E, InitError = E>, T: NewService<C, Request = Request<S>, Response = (), Error = E, InitError = E>,
T::Future: 'static, T::Future: 'static,
T::Service: 'static, T::Service: 'static,
<T::Service as Service<Request<S>>>::Future: 'static, <T::Service as Service>::Future: 'static,
{ {
pub fn new<F: IntoNewService<T, Request<S>, C>>(factory: F) -> Self { pub fn new<F: IntoNewService<T, C>>(factory: F) -> Self {
Self { Self {
factory: Rc::new(factory.into_new_service()), factory: Rc::new(factory.into_new_service()),
_t: PhantomData, _t: PhantomData,
@ -60,15 +60,16 @@ impl<S, T, E, C> Clone for StreamNewService<S, T, E, C> {
} }
} }
impl<S, T, E, C> NewService<S, C> for StreamNewService<S, T, E, C> impl<S, T, E, C> NewService<C> for StreamNewService<S, T, E, C>
where where
C: Clone, C: Clone,
S: IntoStream + 'static, S: IntoStream + 'static,
T: NewService<Request<S>, C, Response = (), Error = E, InitError = E>, T: NewService<C, Request = Request<S>, Response = (), Error = E, InitError = E>,
T::Future: 'static, T::Future: 'static,
T::Service: 'static, T::Service: 'static,
<T::Service as Service<Request<S>>>::Future: 'static, <T::Service as Service>::Future: 'static,
{ {
type Request = S;
type Response = (); type Response = ();
type Error = E; type Error = E;
type InitError = E; type InitError = E;
@ -90,15 +91,16 @@ pub struct StreamService<S, T, E, C = ()> {
_t: PhantomData<(S, E)>, _t: PhantomData<(S, E)>,
} }
impl<S, T, E, C> Service<S> for StreamService<S, T, E, C> impl<S, T, E, C> Service for StreamService<S, T, E, C>
where where
S: IntoStream + 'static, S: IntoStream + 'static,
T: NewService<Request<S>, C, Response = (), Error = E, InitError = E>, T: NewService<C, Request = Request<S>, Response = (), Error = E, InitError = E>,
T::Future: 'static, T::Future: 'static,
T::Service: 'static, T::Service: 'static,
<T::Service as Service<Request<S>>>::Future: 'static, <T::Service as Service>::Future: 'static,
C: Clone, C: Clone,
{ {
type Request = S;
type Response = (); type Response = ();
type Error = E; type Error = E;
type Future = Box<Future<Item = (), Error = E>>; type Future = Box<Future<Item = (), Error = E>>;
@ -119,7 +121,7 @@ where
pub struct StreamDispatcher<S, T> pub struct StreamDispatcher<S, T>
where where
S: IntoStream + 'static, S: IntoStream + 'static,
T: Service<Request<S>, Response = ()> + 'static, T: Service<Request = Request<S>, Response = ()> + 'static,
T::Future: 'static, T::Future: 'static,
{ {
stream: S, stream: S,
@ -131,13 +133,13 @@ where
impl<S, T> StreamDispatcher<S, T> impl<S, T> StreamDispatcher<S, T>
where where
S: Stream, S: Stream,
T: Service<Request<S>, Response = ()>, T: Service<Request = Request<S>, Response = ()>,
T::Future: 'static, T::Future: 'static,
{ {
pub fn new<F1, F2>(stream: F1, service: F2) -> Self pub fn new<F1, F2>(stream: F1, service: F2) -> Self
where where
F1: IntoStream<Stream = S, Item = S::Item, Error = S::Error>, F1: IntoStream<Stream = S, Item = S::Item, Error = S::Error>,
F2: IntoService<T, Request<S>>, F2: IntoService<T>,
{ {
let (err_tx, err_rx) = mpsc::unbounded(); let (err_tx, err_rx) = mpsc::unbounded();
StreamDispatcher { StreamDispatcher {
@ -152,7 +154,7 @@ where
impl<S, T> Future for StreamDispatcher<S, T> impl<S, T> Future for StreamDispatcher<S, T>
where where
S: Stream, S: Stream,
T: Service<Request<S>, Response = ()>, T: Service<Request = Request<S>, Response = ()>,
T::Future: 'static, T::Future: 'static,
{ {
type Item = (); type Item = ();
@ -230,14 +232,15 @@ impl<T> Clone for TakeItem<T> {
} }
} }
impl<T: Stream> NewService<T, ()> for TakeItem<T> { impl<T: Stream, C> NewService<C> for TakeItem<T> {
type Request = T;
type Response = (Option<T::Item>, T); type Response = (Option<T::Item>, T);
type Error = T::Error; type Error = T::Error;
type InitError = (); type InitError = ();
type Service = TakeItemService<T>; type Service = TakeItemService<T>;
type Future = FutureResult<Self::Service, Self::InitError>; type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future { fn new_service(&self, _: &C) -> Self::Future {
ok(TakeItemService { _t: PhantomData }) ok(TakeItemService { _t: PhantomData })
} }
} }
@ -253,7 +256,8 @@ impl<T> Clone for TakeItemService<T> {
} }
} }
impl<T: Stream> Service<T> for TakeItemService<T> { impl<T: Stream> Service for TakeItemService<T> {
type Request = T;
type Response = (Option<T::Item>, T); type Response = (Option<T::Item>, T);
type Error = T::Error; type Error = T::Error;
type Future = TakeItemServiceResponse<T>; type Future = TakeItemServiceResponse<T>;

View File

@ -42,6 +42,7 @@ impl Default for LowResTime {
} }
impl NewService<()> for LowResTime { impl NewService<()> for LowResTime {
type Request = ();
type Response = Instant; type Response = Instant;
type Error = Void; type Error = Void;
type InitError = Void; type InitError = Void;
@ -87,7 +88,8 @@ impl LowResTimeService {
} }
} }
impl Service<()> for LowResTimeService { impl Service for LowResTimeService {
type Request = ();
type Response = Instant; type Response = Instant;
type Error = Void; type Error = Void;
type Future = FutureResult<Self::Response, Self::Error>; type Future = FutureResult<Self::Response, Self::Error>;

View File

@ -80,10 +80,11 @@ impl<E> Clone for Timeout<E> {
} }
} }
impl<S, R, E> Transform<S, R> for Timeout<E> impl<S, E> Transform<S> for Timeout<E>
where where
S: Service<R>, S: Service,
{ {
type Request = S::Request;
type Response = S::Response; type Response = S::Response;
type Error = TimeoutError<S::Error>; type Error = TimeoutError<S::Error>;
type InitError = E; type InitError = E;
@ -111,19 +112,20 @@ impl<S> TimeoutService<S> {
} }
} }
impl<S, R> Service<R> for TimeoutService<S> impl<S> Service for TimeoutService<S>
where where
S: Service<R>, S: Service,
{ {
type Request = S::Request;
type Response = S::Response; type Response = S::Response;
type Error = TimeoutError<S::Error>; type Error = TimeoutError<S::Error>;
type Future = TimeoutServiceResponse<S, R>; type Future = TimeoutServiceResponse<S>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready().map_err(TimeoutError::Service) self.service.poll_ready().map_err(TimeoutError::Service)
} }
fn call(&mut self, request: R) -> Self::Future { fn call(&mut self, request: S::Request) -> Self::Future {
TimeoutServiceResponse { TimeoutServiceResponse {
fut: self.service.call(request), fut: self.service.call(request),
sleep: Delay::new(clock::now() + self.timeout), sleep: Delay::new(clock::now() + self.timeout),
@ -133,14 +135,14 @@ where
/// `TimeoutService` response future /// `TimeoutService` response future
#[derive(Debug)] #[derive(Debug)]
pub struct TimeoutServiceResponse<T: Service<R>, R> { pub struct TimeoutServiceResponse<T: Service> {
fut: T::Future, fut: T::Future,
sleep: Delay, sleep: Delay,
} }
impl<T, R> Future for TimeoutServiceResponse<T, R> impl<T> Future for TimeoutServiceResponse<T>
where where
T: Service<R>, T: Service,
{ {
type Item = T::Response; type Item = T::Response;
type Error = TimeoutError<T::Error>; type Error = TimeoutError<T::Error>;
@ -175,7 +177,8 @@ mod tests {
struct SleepService(Duration); struct SleepService(Duration);
impl Service<()> for SleepService { impl Service for SleepService {
type Request = ();
type Response = (); type Response = ();
type Error = (); type Error = ();
type Future = Box<Future<Item = (), Error = ()>>; type Future = Box<Future<Item = (), Error = ()>>;