1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-06-26 19:47:43 +02:00

make service Request type generic

This commit is contained in:
Nikolay Kim
2019-03-05 07:35:26 -08:00
parent e8a49801eb
commit dfbb77f98d
34 changed files with 622 additions and 747 deletions

View File

@ -1,5 +1,10 @@
# Changes
## [0.4.0] - 2019-03-xx
* Upgrade actix-service
## [0.3.2] - 2019-03-04
### Changed

View File

@ -13,9 +13,9 @@ pub struct CloneableService<T: 'static> {
}
impl<T: 'static> CloneableService<T> {
pub fn new(service: T) -> Self
pub fn new<R>(service: T) -> Self
where
T: Service,
T: Service<R>,
{
Self {
service: Cell::new(service),
@ -33,11 +33,10 @@ impl<T: 'static> Clone for CloneableService<T> {
}
}
impl<T> Service for CloneableService<T>
impl<T, R> Service<R> for CloneableService<T>
where
T: Service + 'static,
T: Service<R> + 'static,
{
type Request = T::Request;
type Response = T::Response;
type Error = T::Error;
type Future = T::Future;
@ -46,7 +45,7 @@ where
self.service.get_mut().poll_ready()
}
fn call(&mut self, req: T::Request) -> Self::Future {
fn call(&mut self, req: R) -> Self::Future {
self.service.get_mut().call(req)
}
}

View File

@ -21,12 +21,11 @@ impl<A: Clone, B: Clone> Clone for EitherService<A, B> {
}
}
impl<A, B> Service for EitherService<A, B>
impl<A, B, R> Service<R> for EitherService<A, B>
where
A: Service,
B: Service<Request = A::Request, Response = A::Response, Error = A::Error>,
A: Service<R>,
B: Service<R, Response = A::Response, Error = A::Error>,
{
type Request = A::Request;
type Response = A::Response;
type Error = A::Error;
type Future = future::Either<A::Future, B::Future>;
@ -38,7 +37,7 @@ where
}
}
fn call(&mut self, req: A::Request) -> Self::Future {
fn call(&mut self, req: R) -> Self::Future {
match self {
EitherService::A(ref mut inner) => future::Either::A(inner.call(req)),
EitherService::B(ref mut inner) => future::Either::B(inner.call(req)),
@ -53,52 +52,33 @@ pub enum Either<A, B> {
}
impl<A, B> Either<A, B> {
pub fn new_a<C>(srv: A) -> Self
pub fn new_a<R, C>(srv: A) -> Self
where
A: NewService<C>,
B: NewService<
C,
Request = A::Request,
Response = A::Response,
Error = A::Error,
InitError = A::InitError,
>,
A: NewService<R, C>,
B: NewService<R, C, Response = A::Response, Error = A::Error, InitError = A::InitError>,
{
Either::A(srv)
}
pub fn new_b<C>(srv: B) -> Self
pub fn new_b<R, C>(srv: B) -> Self
where
A: NewService<C>,
B: NewService<
C,
Request = A::Request,
Response = A::Response,
Error = A::Error,
InitError = A::InitError,
>,
A: NewService<R, C>,
B: NewService<R, C, Response = A::Response, Error = A::Error, InitError = A::InitError>,
{
Either::B(srv)
}
}
impl<A, B, C> NewService<C> for Either<A, B>
impl<A, B, R, C> NewService<R, C> for Either<A, B>
where
A: NewService<C>,
B: NewService<
C,
Request = A::Request,
Response = A::Response,
Error = A::Error,
InitError = A::InitError,
>,
A: NewService<R, C>,
B: NewService<R, C, Response = A::Response, Error = A::Error, InitError = A::InitError>,
{
type Request = A::Request;
type Response = A::Response;
type Error = A::Error;
type InitError = A::InitError;
type Service = EitherService<A::Service, B::Service>;
type Future = EitherNewService<A, B, C>;
type Future = EitherNewService<A, B, R, C>;
fn new_service(&self, cfg: &C) -> Self::Future {
match self {
@ -118,21 +98,15 @@ impl<A: Clone, B: Clone> Clone for Either<A, B> {
}
#[doc(hidden)]
pub enum EitherNewService<A: NewService<C>, B: NewService<C>, C> {
pub enum EitherNewService<A: NewService<R, C>, B: NewService<R, C>, R, C> {
A(<A::Future as IntoFuture>::Future),
B(<B::Future as IntoFuture>::Future),
}
impl<A, B, C> Future for EitherNewService<A, B, C>
impl<A, B, R, C> Future for EitherNewService<A, B, R, C>
where
A: NewService<C>,
B: NewService<
C,
Request = A::Request,
Response = A::Response,
Error = A::Error,
InitError = A::InitError,
>,
A: NewService<R, C>,
B: NewService<R, C, Response = A::Response, Error = A::Error, InitError = A::InitError>,
{
type Item = EitherService<A::Service, B::Service>;
type Error = A::InitError;

View File

@ -23,15 +23,15 @@ pub struct FramedNewService<S, T, U, C> {
impl<S, T, U, C> FramedNewService<S, T, U, C>
where
C: Clone,
S: NewService<C, Request = Request<U>, Response = Response<U>>,
S: NewService<Request<U>, C, Response = Response<U>>,
S::Error: 'static,
<S::Service as Service>::Future: 'static,
<S::Service as Service<Request<U>>>::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
pub fn new<F1: IntoNewService<S, C>>(factory: F1) -> Self {
pub fn new<F1: IntoNewService<S, Request<U>, C>>(factory: F1) -> Self {
Self {
factory: factory.into_new_service(),
_t: PhantomData,
@ -51,18 +51,17 @@ where
}
}
impl<S, T, U, C> NewService<C> for FramedNewService<S, T, U, C>
impl<S, T, U, C> NewService<Framed<T, U>, C> for FramedNewService<S, T, U, C>
where
C: Clone,
S: NewService<C, Request = Request<U>, Response = Response<U>> + Clone,
S: NewService<Request<U>, C, Response = Response<U>> + Clone,
S::Error: 'static,
<S::Service as Service>::Future: 'static,
<S::Service as Service<Request<U>>>::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
type Request = Framed<T, U>;
type Response = FramedTransport<S::Service, T, U>;
type Error = S::InitError;
type InitError = S::InitError;
@ -98,18 +97,17 @@ where
}
}
impl<S, T, U, C> Service for FramedService<S, T, U, C>
impl<S, T, U, C> Service<Framed<T, U>> for FramedService<S, T, U, C>
where
S: NewService<C, Request = Request<U>, Response = Response<U>>,
S: NewService<Request<U>, C, Response = Response<U>>,
S::Error: 'static,
<S::Service as Service>::Future: 'static,
<S::Service as Service<Request<U>>>::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
C: Clone,
{
type Request = Framed<T, U>;
type Response = FramedTransport<S::Service, T, U>;
type Error = S::InitError;
type Future = FramedServiceResponseFuture<S, T, U, C>;
@ -129,9 +127,9 @@ where
#[doc(hidden)]
pub struct FramedServiceResponseFuture<S, T, U, C>
where
S: NewService<C, Request = Request<U>, Response = Response<U>>,
S: NewService<Request<U>, C, Response = Response<U>>,
S::Error: 'static,
<S::Service as Service>::Future: 'static,
<S::Service as Service<Request<U>>>::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
@ -143,9 +141,9 @@ where
impl<S, T, U, C> Future for FramedServiceResponseFuture<S, T, U, C>
where
S: NewService<C, Request = Request<U>, Response = Response<U>>,
S: NewService<Request<U>, C, Response = Response<U>>,
S::Error: 'static,
<S::Service as Service>::Future: 'static,
<S::Service as Service<Request<U>>>::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
@ -182,7 +180,7 @@ impl<E, U: Encoder + Decoder> From<E> for FramedTransportError<E, U> {
/// and pass then to the service.
pub struct FramedTransport<S, T, U>
where
S: Service<Request = Request<U>, Response = Response<U>>,
S: Service<Request<U>, Response = Response<U>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
@ -196,7 +194,7 @@ where
inner: Cell<FramedTransportInner<<U as Encoder>::Item, S::Error>>,
}
enum TransportState<S: Service, U: Encoder + Decoder> {
enum TransportState<S: Service<Request<U>>, U: Encoder + Decoder> {
Processing,
Error(FramedTransportError<S::Error, U>),
FramedError(FramedTransportError<S::Error, U>),
@ -210,7 +208,7 @@ struct FramedTransportInner<I, E> {
impl<S, T, U> FramedTransport<S, T, U>
where
S: Service<Request = Request<U>, Response = Response<U>>,
S: Service<Request<U>, Response = Response<U>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
@ -302,7 +300,7 @@ where
impl<S, T, U> FramedTransport<S, T, U>
where
S: Service<Request = Request<U>, Response = Response<U>>,
S: Service<Request<U>, Response = Response<U>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
@ -310,7 +308,7 @@ where
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
pub fn new<F: IntoService<S>>(framed: Framed<T, U>, service: F) -> Self {
pub fn new<F: IntoService<S, Request<U>>>(framed: Framed<T, U>, service: F) -> Self {
FramedTransport {
framed,
service: service.into_service(),
@ -348,7 +346,7 @@ where
impl<S, T, U> Future for FramedTransport<S, T, U>
where
S: Service<Request = Request<U>, Response = Response<U>>,
S: Service<Request<U>, Response = Response<U>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
@ -408,13 +406,12 @@ where
}
}
impl<T, U, F> NewService<()> for IntoFramed<T, U, F>
impl<T, U, F> NewService<T, ()> for IntoFramed<T, U, F>
where
T: AsyncRead + AsyncWrite,
F: Fn() -> U + Send + Clone + 'static,
U: Encoder + Decoder,
{
type Request = T;
type Response = Framed<T, U>;
type Error = ();
type InitError = ();
@ -439,13 +436,12 @@ where
_t: PhantomData<(T,)>,
}
impl<T, U, F> Service for IntoFramedService<T, U, F>
impl<T, U, F> Service<T> for IntoFramedService<T, U, F>
where
T: AsyncRead + AsyncWrite,
F: Fn() -> U + Send + Clone + 'static,
U: Encoder + Decoder,
{
type Request = T;
type Response = Framed<T, U>;
type Error = ();
type Future = FutureResult<Self::Response, Self::Error>;

View File

@ -24,8 +24,7 @@ impl Default for InFlight {
}
}
impl<S: Service> Transform<S> for InFlight {
type Request = S::Request;
impl<S: Service<R>, R> Transform<R, S> for InFlight {
type Response = S::Response;
type Error = S::Error;
type InitError = Void;
@ -51,14 +50,13 @@ impl<S> InFlightService<S> {
}
}
impl<T> Service for InFlightService<T>
impl<T, R> Service<R> for InFlightService<T>
where
T: Service,
T: Service<R>,
{
type Request = T::Request;
type Response = T::Response;
type Error = T::Error;
type Future = InFlightServiceResponse<T>;
type Future = InFlightServiceResponse<T, R>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready()?;
@ -71,7 +69,7 @@ where
}
}
fn call(&mut self, req: T::Request) -> Self::Future {
fn call(&mut self, req: R) -> Self::Future {
InFlightServiceResponse {
fut: self.service.call(req),
_guard: self.count.get(),
@ -80,12 +78,12 @@ where
}
#[doc(hidden)]
pub struct InFlightServiceResponse<T: Service> {
pub struct InFlightServiceResponse<T: Service<R>, R> {
fut: T::Future,
_guard: CounterGuard,
}
impl<T: Service> Future for InFlightServiceResponse<T> {
impl<T: Service<R>, R> Future for InFlightServiceResponse<T, R> {
type Item = T::Response;
type Error = T::Error;
@ -107,8 +105,7 @@ mod tests {
struct SleepService(Duration);
impl Service for SleepService {
type Request = ();
impl Service<()> for SleepService {
type Response = ();
type Error = ();
type Future = Box<Future<Item = (), Error = ()>>;

View File

@ -43,11 +43,10 @@ where
}
}
impl<R, E, F> NewService<()> for KeepAlive<R, E, F>
impl<R, E, F> NewService<R, ()> for KeepAlive<R, E, F>
where
F: Fn() -> E + Clone,
{
type Request = R;
type Response = R;
type Error = E;
type InitError = Void;
@ -89,11 +88,10 @@ where
}
}
impl<R, E, F> Service for KeepAliveService<R, E, F>
impl<R, E, F> Service<R> for KeepAliveService<R, E, F>
where
F: Fn() -> E,
{
type Request = R;
type Response = R;
type Error = E;
type Future = FutureResult<R, E>;

View File

@ -52,46 +52,39 @@ pub struct InOrder<S> {
_t: PhantomData<S>,
}
impl<S> InOrder<S>
where
S: Service,
S::Response: 'static,
S::Future: 'static,
S::Error: 'static,
{
pub fn new() -> Self {
impl<S> InOrder<S> {
pub fn new<R>() -> Self
where
S: Service<R>,
S::Response: 'static,
S::Future: 'static,
S::Error: 'static,
{
Self { _t: PhantomData }
}
pub fn service(service: S) -> InOrderService<S> {
pub fn service<R>(service: S) -> InOrderService<S, R>
where
S: Service<R>,
S::Response: 'static,
S::Future: 'static,
S::Error: 'static,
{
InOrderService::new(service)
}
}
impl<S> Default for InOrder<S>
impl<S, R> Transform<R, S> for InOrder<S>
where
S: Service,
S: Service<R>,
S::Response: 'static,
S::Future: 'static,
S::Error: 'static,
{
fn default() -> Self {
Self::new()
}
}
impl<S> Transform<S> for InOrder<S>
where
S: Service,
S::Response: 'static,
S::Future: 'static,
S::Error: 'static,
{
type Request = S::Request;
type Response = S::Response;
type Error = InOrderError<S::Error>;
type InitError = Void;
type Transform = InOrderService<S>;
type Transform = InOrderService<S, R>;
type Future = FutureResult<Self::Transform, Self::InitError>;
fn new_transform(&self, service: S) -> Self::Future {
@ -99,15 +92,15 @@ where
}
}
pub struct InOrderService<S: Service> {
pub struct InOrderService<S: Service<R>, R> {
service: S,
task: Rc<AtomicTask>,
acks: VecDeque<Record<S::Response, S::Error>>,
}
impl<S> InOrderService<S>
impl<S, R> InOrderService<S, R>
where
S: Service,
S: Service<R>,
S::Response: 'static,
S::Future: 'static,
S::Error: 'static,
@ -121,17 +114,16 @@ where
}
}
impl<S> Service for InOrderService<S>
impl<S, R> Service<R> for InOrderService<S, R>
where
S: Service,
S: Service<R>,
S::Response: 'static,
S::Future: 'static,
S::Error: 'static,
{
type Request = S::Request;
type Response = S::Response;
type Error = InOrderError<S::Error>;
type Future = InOrderServiceResponse<S>;
type Future = InOrderServiceResponse<S, R>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
// poll_ready could be called from different task
@ -156,7 +148,7 @@ where
Ok(Async::Ready(()))
}
fn call(&mut self, request: S::Request) -> Self::Future {
fn call(&mut self, request: R) -> Self::Future {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
self.acks.push_back(Record { rx: rx1, tx: tx2 });
@ -173,11 +165,11 @@ where
}
#[doc(hidden)]
pub struct InOrderServiceResponse<S: Service> {
pub struct InOrderServiceResponse<S: Service<R>, R> {
rx: oneshot::Receiver<Result<S::Response, S::Error>>,
}
impl<S: Service> Future for InOrderServiceResponse<S> {
impl<S: Service<R>, R> Future for InOrderServiceResponse<S, R> {
type Item = S::Response;
type Error = InOrderError<S::Error>;
@ -204,8 +196,7 @@ mod tests {
struct Srv;
impl Service for Srv {
type Request = oneshot::Receiver<usize>;
impl Service<oneshot::Receiver<usize>> for Srv {
type Response = usize;
type Error = ();
type Future = Box<Future<Item = usize, Error = ()>>;
@ -219,11 +210,11 @@ mod tests {
}
}
struct SrvPoll<S: Service> {
struct SrvPoll<S: Service<oneshot::Receiver<usize>>> {
s: S,
}
impl<S: Service> Future for SrvPoll<S> {
impl<S: Service<oneshot::Receiver<usize>>> Future for SrvPoll<S> {
type Item = ();
type Error = ();

View File

@ -38,12 +38,12 @@ impl<S, T, E, C> StreamNewService<S, T, E, C>
where
C: Clone,
S: IntoStream,
T: NewService<C, Request = Request<S>, Response = (), Error = E, InitError = E>,
T: NewService<Request<S>, C, Response = (), Error = E, InitError = E>,
T::Future: 'static,
T::Service: 'static,
<T::Service as Service>::Future: 'static,
<T::Service as Service<Request<S>>>::Future: 'static,
{
pub fn new<F: IntoNewService<T, C>>(factory: F) -> Self {
pub fn new<F: IntoNewService<T, Request<S>, C>>(factory: F) -> Self {
Self {
factory: Rc::new(factory.into_new_service()),
_t: PhantomData,
@ -60,16 +60,15 @@ impl<S, T, E, C> Clone for StreamNewService<S, T, E, C> {
}
}
impl<S, T, E, C> NewService<C> for StreamNewService<S, T, E, C>
impl<S, T, E, C> NewService<S, C> for StreamNewService<S, T, E, C>
where
C: Clone,
S: IntoStream + 'static,
T: NewService<C, Request = Request<S>, Response = (), Error = E, InitError = E>,
T: NewService<Request<S>, C, Response = (), Error = E, InitError = E>,
T::Future: 'static,
T::Service: 'static,
<T::Service as Service>::Future: 'static,
<T::Service as Service<Request<S>>>::Future: 'static,
{
type Request = S;
type Response = ();
type Error = E;
type InitError = E;
@ -91,16 +90,15 @@ pub struct StreamService<S, T, E, C = ()> {
_t: PhantomData<(S, E)>,
}
impl<S, T, E, C> Service for StreamService<S, T, E, C>
impl<S, T, E, C> Service<S> for StreamService<S, T, E, C>
where
S: IntoStream + 'static,
T: NewService<C, Request = Request<S>, Response = (), Error = E, InitError = E>,
T: NewService<Request<S>, C, Response = (), Error = E, InitError = E>,
T::Future: 'static,
T::Service: 'static,
<T::Service as Service>::Future: 'static,
<T::Service as Service<Request<S>>>::Future: 'static,
C: Clone,
{
type Request = S;
type Response = ();
type Error = E;
type Future = Box<Future<Item = (), Error = E>>;
@ -121,7 +119,7 @@ where
pub struct StreamDispatcher<S, T>
where
S: IntoStream + 'static,
T: Service<Request = Request<S>, Response = ()> + 'static,
T: Service<Request<S>, Response = ()> + 'static,
T::Future: 'static,
{
stream: S,
@ -133,13 +131,13 @@ where
impl<S, T> StreamDispatcher<S, T>
where
S: Stream,
T: Service<Request = Request<S>, Response = ()>,
T: Service<Request<S>, Response = ()>,
T::Future: 'static,
{
pub fn new<F1, F2>(stream: F1, service: F2) -> Self
where
F1: IntoStream<Stream = S, Item = S::Item, Error = S::Error>,
F2: IntoService<T>,
F2: IntoService<T, Request<S>>,
{
let (err_tx, err_rx) = mpsc::unbounded();
StreamDispatcher {
@ -154,7 +152,7 @@ where
impl<S, T> Future for StreamDispatcher<S, T>
where
S: Stream,
T: Service<Request = Request<S>, Response = ()>,
T: Service<Request<S>, Response = ()>,
T::Future: 'static,
{
type Item = ();
@ -232,8 +230,7 @@ impl<T> Clone for TakeItem<T> {
}
}
impl<T: Stream> NewService<()> for TakeItem<T> {
type Request = T;
impl<T: Stream> NewService<T, ()> for TakeItem<T> {
type Response = (Option<T::Item>, T);
type Error = T::Error;
type InitError = ();
@ -256,8 +253,7 @@ impl<T> Clone for TakeItemService<T> {
}
}
impl<T: Stream> Service for TakeItemService<T> {
type Request = T;
impl<T: Stream> Service<T> for TakeItemService<T> {
type Response = (Option<T::Item>, T);
type Error = T::Error;
type Future = TakeItemServiceResponse<T>;

View File

@ -42,7 +42,6 @@ impl Default for LowResTime {
}
impl NewService<()> for LowResTime {
type Request = ();
type Response = Instant;
type Error = Void;
type InitError = Void;
@ -88,8 +87,7 @@ impl LowResTimeService {
}
}
impl Service for LowResTimeService {
type Request = ();
impl Service<()> for LowResTimeService {
type Response = Instant;
type Error = Void;
type Future = FutureResult<Self::Response, Self::Error>;

View File

@ -80,11 +80,10 @@ impl<E> Clone for Timeout<E> {
}
}
impl<S, E> Transform<S> for Timeout<E>
impl<S, R, E> Transform<R, S> for Timeout<E>
where
S: Service,
S: Service<R>,
{
type Request = S::Request;
type Response = S::Response;
type Error = TimeoutError<S::Error>;
type InitError = E;
@ -112,20 +111,19 @@ impl<S> TimeoutService<S> {
}
}
impl<S> Service for TimeoutService<S>
impl<S, R> Service<R> for TimeoutService<S>
where
S: Service,
S: Service<R>,
{
type Request = S::Request;
type Response = S::Response;
type Error = TimeoutError<S::Error>;
type Future = TimeoutServiceResponse<S>;
type Future = TimeoutServiceResponse<S, R>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready().map_err(TimeoutError::Service)
}
fn call(&mut self, request: S::Request) -> Self::Future {
fn call(&mut self, request: R) -> Self::Future {
TimeoutServiceResponse {
fut: self.service.call(request),
sleep: Delay::new(clock::now() + self.timeout),
@ -135,14 +133,14 @@ where
/// `TimeoutService` response future
#[derive(Debug)]
pub struct TimeoutServiceResponse<T: Service> {
pub struct TimeoutServiceResponse<T: Service<R>, R> {
fut: T::Future,
sleep: Delay,
}
impl<T> Future for TimeoutServiceResponse<T>
impl<T, R> Future for TimeoutServiceResponse<T, R>
where
T: Service,
T: Service<R>,
{
type Item = T::Response;
type Error = TimeoutError<T::Error>;
@ -177,8 +175,7 @@ mod tests {
struct SleepService(Duration);
impl Service for SleepService {
type Request = ();
impl Service<()> for SleepService {
type Response = ();
type Error = ();
type Future = Box<Future<Item = (), Error = ()>>;