1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-06-26 19:47:43 +02:00

Use associated type for NewService config

This commit is contained in:
Nikolay Kim
2019-05-12 06:03:50 -07:00
parent 76c317e0b2
commit f0776fca94
46 changed files with 810 additions and 1010 deletions

View File

@ -1,5 +1,18 @@
# Changes
## [0.4.0] - 2019-05-11
### Changed
* Change `Either` to handle two nexted services
* Upgrade actix-service 0.4
### Deleted
* Framed related services
* Stream related services
## [0.3.5] - 2019-04-04

View File

@ -21,7 +21,8 @@ path = "src/lib.rs"
actix-service = "0.3.3"
actix-codec = "0.1.1"
bytes = "0.4"
futures = "0.1.24"
either = "1.5.2"
futures = "0.1.25"
tokio-timer = "0.2.8"
tokio-current-thread = "0.1.4"
log = "0.4"

View File

@ -1,5 +1,5 @@
//! Contains `Either` service and related types and functions.
use actix_service::{NewService, Service};
use actix_service::{IntoNewService, NewService, Service};
use futures::{future, try_ready, Async, Future, IntoFuture, Poll};
/// Combine two different service types into a single type.
@ -7,16 +7,16 @@ use futures::{future, try_ready, Async, Future, IntoFuture, Poll};
/// Both services must be of the same request, response, and error types.
/// `EitherService` is useful for handling conditional branching in service
/// middleware to different inner service types.
pub enum EitherService<A, B> {
A(A),
B(B),
pub struct EitherService<A, B> {
left: A,
right: B,
}
impl<A: Clone, B: Clone> Clone for EitherService<A, B> {
fn clone(&self) -> Self {
match self {
EitherService::A(srv) => EitherService::A(srv.clone()),
EitherService::B(srv) => EitherService::B(srv.clone()),
EitherService {
left: self.left.clone(),
right: self.right.clone(),
}
}
}
@ -24,129 +24,130 @@ impl<A: Clone, B: Clone> Clone for EitherService<A, B> {
impl<A, B> Service for EitherService<A, B>
where
A: Service,
B: Service<Request = A::Request, Response = A::Response, Error = A::Error>,
B: Service<Response = A::Response, Error = A::Error>,
{
type Request = A::Request;
type Request = either::Either<A::Request, B::Request>;
type Response = A::Response;
type Error = A::Error;
type Future = future::Either<A::Future, B::Future>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
match self {
EitherService::A(ref mut inner) => inner.poll_ready(),
EitherService::B(ref mut inner) => inner.poll_ready(),
let left = self.left.poll_ready()?;
let right = self.right.poll_ready()?;
if left.is_ready() && right.is_ready() {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
fn call(&mut self, req: A::Request) -> Self::Future {
match self {
EitherService::A(ref mut inner) => future::Either::A(inner.call(req)),
EitherService::B(ref mut inner) => future::Either::B(inner.call(req)),
fn call(&mut self, req: either::Either<A::Request, B::Request>) -> Self::Future {
match req {
either::Either::Left(req) => future::Either::A(self.left.call(req)),
either::Either::Right(req) => future::Either::B(self.right.call(req)),
}
}
}
/// Combine two different new service types into a single type.
pub enum Either<A, B> {
A(A),
B(B),
/// Combine two different new service types into a single service.
pub struct Either<A, B> {
left: A,
right: B,
}
impl<A, B> Either<A, B> {
pub fn new_a<C>(srv: A) -> Self
pub fn new_a<F>(srv: F) -> Either<A, ()>
where
A: NewService<C>,
B: NewService<
C,
Request = A::Request,
Response = A::Response,
Error = A::Error,
InitError = A::InitError,
>,
A: NewService,
F: IntoNewService<A>,
{
Either::A(srv)
Either {
left: srv.into_new_service(),
right: (),
}
}
pub fn new_b<C>(srv: B) -> Self
pub fn new_b<F>(srv: F) -> Either<(), B>
where
A: NewService<C>,
B: NewService<
C,
Request = A::Request,
Response = A::Response,
Error = A::Error,
InitError = A::InitError,
>,
B: NewService,
F: IntoNewService<B>,
{
Either::B(srv)
Either {
left: (),
right: srv.into_new_service(),
}
}
}
impl<A, B, C> NewService<C> for Either<A, B>
impl<A, B> NewService for Either<A, B>
where
A: NewService<C>,
A: NewService,
B: NewService<
C,
Request = A::Request,
Config = A::Config,
Response = A::Response,
Error = A::Error,
InitError = A::InitError,
>,
{
type Request = A::Request;
type Request = either::Either<A::Request, B::Request>;
type Response = A::Response;
type Error = A::Error;
type InitError = A::InitError;
type Config = A::Config;
type Service = EitherService<A::Service, B::Service>;
type Future = EitherNewService<A, B, C>;
type Future = EitherNewService<A, B>;
fn new_service(&self, cfg: &C) -> Self::Future {
match self {
Either::A(ref inner) => EitherNewService::A(inner.new_service(cfg)),
Either::B(ref inner) => EitherNewService::B(inner.new_service(cfg)),
fn new_service(&self, cfg: &A::Config) -> Self::Future {
EitherNewService {
left: None,
right: None,
left_fut: self.left.new_service(cfg),
right_fut: self.right.new_service(cfg),
}
}
}
impl<A: Clone, B: Clone> Clone for Either<A, B> {
fn clone(&self) -> Self {
match self {
Either::A(srv) => Either::A(srv.clone()),
Either::B(srv) => Either::B(srv.clone()),
Self {
left: self.left.clone(),
right: self.right.clone(),
}
}
}
#[doc(hidden)]
pub enum EitherNewService<A: NewService<C>, B: NewService<C>, C> {
A(<A::Future as IntoFuture>::Future),
B(<B::Future as IntoFuture>::Future),
pub struct EitherNewService<A: NewService, B: NewService> {
left: Option<A::Service>,
right: Option<B::Service>,
left_fut: <A::Future as IntoFuture>::Future,
right_fut: <B::Future as IntoFuture>::Future,
}
impl<A, B, C> Future for EitherNewService<A, B, C>
impl<A, B> Future for EitherNewService<A, B>
where
A: NewService<C>,
B: NewService<
C,
Request = A::Request,
Response = A::Response,
Error = A::Error,
InitError = A::InitError,
>,
A: NewService,
B: NewService<Response = A::Response, Error = A::Error, InitError = A::InitError>,
{
type Item = EitherService<A::Service, B::Service>;
type Error = A::InitError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self {
EitherNewService::A(ref mut fut) => {
let service = try_ready!(fut.poll());
Ok(Async::Ready(EitherService::A(service)))
}
EitherNewService::B(ref mut fut) => {
let service = try_ready!(fut.poll());
Ok(Async::Ready(EitherService::B(service)))
}
if self.left.is_none() {
self.left = Some(try_ready!(self.left_fut.poll()));
}
if self.right.is_none() {
self.right = Some(try_ready!(self.right_fut.poll()));
}
if self.left.is_some() && self.right.is_some() {
Ok(Async::Ready(EitherService {
left: self.left.take().unwrap(),
right: self.right.take().unwrap(),
}))
} else {
Ok(Async::NotReady)
}
}
}

View File

@ -1,14 +1,12 @@
//! Framed dispatcher service and related utilities
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::mem;
use std::{fmt, mem};
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use actix_service::{IntoNewService, IntoService, NewService, Service};
use futures::future::{ok, FutureResult};
use actix_service::{IntoService, Service};
use futures::task::AtomicTask;
use futures::unsync::mpsc;
use futures::{Async, Future, IntoFuture, Poll, Sink, Stream};
use futures::{Async, Future, Poll, Sink, Stream};
use log::debug;
use crate::cell::Cell;
@ -16,156 +14,6 @@ use crate::cell::Cell;
type Request<U> = <U as Decoder>::Item;
type Response<U> = <U as Encoder>::Item;
pub struct FramedNewService<S, T, U, C> {
factory: S,
_t: PhantomData<(T, U, C)>,
}
impl<S, T, U, C> FramedNewService<S, T, U, C>
where
C: Clone,
S: NewService<C, Request = Request<U>, Response = Response<U>>,
S::Error: 'static,
<S::Service as Service>::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
pub fn new<F1: IntoNewService<S, C>>(factory: F1) -> Self {
Self {
factory: factory.into_new_service(),
_t: PhantomData,
}
}
}
impl<S, T, U, C> Clone for FramedNewService<S, T, U, C>
where
S: Clone,
{
fn clone(&self) -> Self {
Self {
factory: self.factory.clone(),
_t: PhantomData,
}
}
}
impl<S, T, U, C> NewService<C> for FramedNewService<S, T, U, C>
where
C: Clone,
S: NewService<C, Request = Request<U>, Response = Response<U>> + Clone,
S::Error: 'static,
<S::Service as Service>::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
type Request = Framed<T, U>;
type Response = FramedTransport<S::Service, T, U>;
type Error = S::InitError;
type InitError = S::InitError;
type Service = FramedService<S, T, U, C>;
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, cfg: &C) -> Self::Future {
ok(FramedService {
factory: self.factory.clone(),
config: cfg.clone(),
_t: PhantomData,
})
}
}
pub struct FramedService<S, T, U, C> {
factory: S,
config: C,
_t: PhantomData<(T, U)>,
}
impl<S, T, U, C> Clone for FramedService<S, T, U, C>
where
S: Clone,
C: Clone,
{
fn clone(&self) -> Self {
Self {
factory: self.factory.clone(),
config: self.config.clone(),
_t: PhantomData,
}
}
}
impl<S, T, U, C> Service for FramedService<S, T, U, C>
where
S: NewService<C, Request = Request<U>, Response = Response<U>>,
S::Error: 'static,
<S::Service as Service>::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
C: Clone,
{
type Request = Framed<T, U>;
type Response = FramedTransport<S::Service, T, U>;
type Error = S::InitError;
type Future = FramedServiceResponseFuture<S, T, U, C>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: Framed<T, U>) -> Self::Future {
FramedServiceResponseFuture {
fut: self.factory.new_service(&self.config),
framed: Some(req),
}
}
}
#[doc(hidden)]
pub struct FramedServiceResponseFuture<S, T, U, C>
where
S: NewService<C, Request = Request<U>, Response = Response<U>>,
S::Error: 'static,
<S::Service as Service>::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
fut: <S::Future as IntoFuture>::Future,
framed: Option<Framed<T, U>>,
}
impl<S, T, U, C> Future for FramedServiceResponseFuture<S, T, U, C>
where
S: NewService<C, Request = Request<U>, Response = Response<U>>,
S::Error: 'static,
<S::Service as Service>::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
type Item = FramedTransport<S::Service, T, U>;
type Error = S::InitError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.fut.poll()? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready(service) => Ok(Async::Ready(FramedTransport::new(
self.framed.take().unwrap(),
service,
))),
}
}
}
/// Framed transport errors
pub enum FramedTransportError<E, U: Encoder + Decoder> {
Service(E),
@ -179,6 +27,42 @@ impl<E, U: Encoder + Decoder> From<E> for FramedTransportError<E, U> {
}
}
impl<E, U: Encoder + Decoder> fmt::Debug for FramedTransportError<E, U>
where
E: fmt::Debug,
<U as Encoder>::Error: fmt::Debug,
<U as Decoder>::Error: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
FramedTransportError::Service(ref e) => {
write!(fmt, "FramedTransportError::Service({:?})", e)
}
FramedTransportError::Encoder(ref e) => {
write!(fmt, "FramedTransportError::Encoder({:?})", e)
}
FramedTransportError::Decoder(ref e) => {
write!(fmt, "FramedTransportError::Encoder({:?})", e)
}
}
}
}
impl<E, U: Encoder + Decoder> fmt::Display for FramedTransportError<E, U>
where
E: fmt::Display,
<U as Encoder>::Error: fmt::Debug,
<U as Decoder>::Error: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
FramedTransportError::Service(ref e) => write!(fmt, "{}", e),
FramedTransportError::Encoder(ref e) => write!(fmt, "{:?}", e),
FramedTransportError::Decoder(ref e) => write!(fmt, "{:?}", e),
}
}
}
pub enum FramedMessage<T> {
Message(T),
Close,
@ -444,78 +328,3 @@ where
}
}
}
pub struct IntoFramed<T, U, F>
where
T: AsyncRead + AsyncWrite,
F: Fn() -> U + Send + Clone + 'static,
U: Encoder + Decoder,
{
factory: F,
_t: PhantomData<(T,)>,
}
impl<T, U, F> IntoFramed<T, U, F>
where
T: AsyncRead + AsyncWrite,
F: Fn() -> U + Send + Clone + 'static,
U: Encoder + Decoder,
{
pub fn new(factory: F) -> Self {
IntoFramed {
factory,
_t: PhantomData,
}
}
}
impl<T, C, U, F> NewService<C> for IntoFramed<T, U, F>
where
T: AsyncRead + AsyncWrite,
F: Fn() -> U + Send + Clone + 'static,
U: Encoder + Decoder,
{
type Request = T;
type Response = Framed<T, U>;
type Error = ();
type InitError = ();
type Service = IntoFramedService<T, U, F>;
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &C) -> Self::Future {
ok(IntoFramedService {
factory: self.factory.clone(),
_t: PhantomData,
})
}
}
pub struct IntoFramedService<T, U, F>
where
T: AsyncRead + AsyncWrite,
F: Fn() -> U + Send + Clone + 'static,
U: Encoder + Decoder,
{
factory: F,
_t: PhantomData<(T,)>,
}
impl<T, U, F> Service for IntoFramedService<T, U, F>
where
T: AsyncRead + AsyncWrite,
F: Fn() -> U + Send + Clone + 'static,
U: Encoder + Decoder,
{
type Request = T;
type Response = Framed<T, U>;
type Error = ();
type Future = FutureResult<Self::Response, Self::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: T) -> Self::Future {
ok(Framed::new(req, (self.factory)()))
}
}

View File

@ -1,4 +1,6 @@
use actix_service::{IntoService, Service, Transform, Void};
use std::convert::Infallible;
use actix_service::{IntoService, Service, Transform};
use futures::future::{ok, FutureResult};
use futures::{Async, Future, Poll};
@ -28,7 +30,7 @@ impl<S: Service> Transform<S> for InFlight {
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type InitError = Void;
type InitError = Infallible;
type Transform = InFlightService<S>;
type Future = FutureResult<Self::Transform, Self::InitError>;

View File

@ -1,7 +1,8 @@
use std::convert::Infallible;
use std::marker::PhantomData;
use std::time::{Duration, Instant};
use actix_service::{NewService, Service, Void};
use actix_service::{NewService, Service};
use futures::future::{ok, FutureResult};
use futures::{Async, Future, Poll};
use tokio_timer::Delay;
@ -43,14 +44,15 @@ where
}
}
impl<R, E, F> NewService<()> for KeepAlive<R, E, F>
impl<R, E, F> NewService for KeepAlive<R, E, F>
where
F: Fn() -> E + Clone,
{
type Request = R;
type Response = R;
type Error = E;
type InitError = Void;
type InitError = Infallible;
type Config = ();
type Service = KeepAliveService<R, E, F>;
type Future = FutureResult<Self::Service, Self::InitError>;

View File

@ -1,9 +1,10 @@
use std::collections::VecDeque;
use std::convert::Infallible;
use std::fmt;
use std::marker::PhantomData;
use std::rc::Rc;
use actix_service::{IntoService, Service, Transform, Void};
use actix_service::{IntoService, Service, Transform};
use futures::future::{ok, FutureResult};
use futures::task::AtomicTask;
use futures::unsync::oneshot;
@ -90,7 +91,7 @@ where
type Request = S::Request;
type Response = S::Response;
type Error = InOrderError<S::Error>;
type InitError = Void;
type InitError = Infallible;
type Transform = InOrderService<S>;
type Future = FutureResult<Self::Transform, Self::InitError>;

View File

@ -1,10 +1,9 @@
use std::marker::PhantomData;
use std::rc::Rc;
use actix_service::{IntoNewService, IntoService, NewService, Service};
use futures::future::{ok, Future, FutureResult};
use actix_service::{IntoService, NewService, Service};
use futures::unsync::mpsc;
use futures::{Async, Poll, Stream};
use futures::{Async, Future, Poll, Stream};
type Request<T> = Result<<T as IntoStream>::Item, <T as IntoStream>::Error>;
@ -29,76 +28,19 @@ where
}
}
pub struct StreamNewService<S, T, E, C> {
pub struct StreamService<S, T: NewService, E> {
factory: Rc<T>,
_t: PhantomData<(S, E, C)>,
}
impl<S, T, E, C> StreamNewService<S, T, E, C>
where
C: Clone,
S: IntoStream,
T: NewService<C, Request = Request<S>, Response = (), Error = E, InitError = E>,
T::Future: 'static,
T::Service: 'static,
<T::Service as Service>::Future: 'static,
{
pub fn new<F: IntoNewService<T, C>>(factory: F) -> Self {
Self {
factory: Rc::new(factory.into_new_service()),
_t: PhantomData,
}
}
}
impl<S, T, E, C> Clone for StreamNewService<S, T, E, C> {
fn clone(&self) -> Self {
Self {
factory: self.factory.clone(),
_t: PhantomData,
}
}
}
impl<S, T, E, C> NewService<C> for StreamNewService<S, T, E, C>
where
C: Clone,
S: IntoStream + 'static,
T: NewService<C, Request = Request<S>, Response = (), Error = E, InitError = E>,
T::Future: 'static,
T::Service: 'static,
<T::Service as Service>::Future: 'static,
{
type Request = S;
type Response = ();
type Error = E;
type InitError = E;
type Service = StreamService<S, T, E, C>;
type Future = FutureResult<Self::Service, E>;
fn new_service(&self, cfg: &C) -> Self::Future {
ok(StreamService {
factory: self.factory.clone(),
config: cfg.clone(),
_t: PhantomData,
})
}
}
pub struct StreamService<S, T, E, C = ()> {
factory: Rc<T>,
config: C,
config: T::Config,
_t: PhantomData<(S, E)>,
}
impl<S, T, E, C> Service for StreamService<S, T, E, C>
impl<S, T, E> Service for StreamService<S, T, E>
where
S: IntoStream + 'static,
T: NewService<C, Request = Request<S>, Response = (), Error = E, InitError = E>,
T: NewService<Request = Request<S>, Response = (), Error = E, InitError = E>,
T::Future: 'static,
T::Service: 'static,
<T::Service as Service>::Future: 'static,
C: Clone,
{
type Request = S;
type Response = ();
@ -207,83 +149,3 @@ impl<F: Future> Future for StreamDispatcherService<F> {
}
}
}
/// `NewService` that implements, read one item from the stream.
pub struct TakeItem<T> {
_t: PhantomData<T>,
}
impl<T> TakeItem<T> {
/// Create new `TakeRequest` instance.
pub fn new() -> Self {
TakeItem { _t: PhantomData }
}
}
impl<T> Default for TakeItem<T> {
fn default() -> Self {
TakeItem { _t: PhantomData }
}
}
impl<T> Clone for TakeItem<T> {
fn clone(&self) -> TakeItem<T> {
TakeItem { _t: PhantomData }
}
}
impl<T: Stream, C> NewService<C> for TakeItem<T> {
type Request = T;
type Response = (Option<T::Item>, T);
type Error = T::Error;
type InitError = ();
type Service = TakeItemService<T>;
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &C) -> Self::Future {
ok(TakeItemService { _t: PhantomData })
}
}
/// `NewService` that implements, read one request from framed object feature.
pub struct TakeItemService<T> {
_t: PhantomData<T>,
}
impl<T> Clone for TakeItemService<T> {
fn clone(&self) -> TakeItemService<T> {
TakeItemService { _t: PhantomData }
}
}
impl<T: Stream> Service for TakeItemService<T> {
type Request = T;
type Response = (Option<T::Item>, T);
type Error = T::Error;
type Future = TakeItemServiceResponse<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: T) -> Self::Future {
TakeItemServiceResponse { stream: Some(req) }
}
}
#[doc(hidden)]
pub struct TakeItemServiceResponse<T: Stream> {
stream: Option<T>,
}
impl<T: Stream> Future for TakeItemServiceResponse<T> {
type Item = (Option<T::Item>, T);
type Error = T::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.stream.as_mut().expect("Use after finish").poll()? {
Async::Ready(item) => Ok(Async::Ready((item, self.stream.take().unwrap()))),
Async::NotReady => Ok(Async::NotReady),
}
}
}

View File

@ -1,6 +1,7 @@
use std::convert::Infallible;
use std::time::{self, Duration, Instant};
use actix_service::{NewService, Service, Void};
use actix_service::{NewService, Service};
use futures::future::{ok, FutureResult};
use futures::{Async, Future, Poll};
use tokio_timer::sleep;
@ -41,11 +42,12 @@ impl Default for LowResTime {
}
}
impl NewService<()> for LowResTime {
impl NewService for LowResTime {
type Request = ();
type Response = Instant;
type Error = Void;
type InitError = Void;
type Error = Infallible;
type InitError = Infallible;
type Config = ();
type Service = LowResTimeService;
type Future = FutureResult<Self::Service, Self::InitError>;
@ -91,7 +93,7 @@ impl LowResTimeService {
impl Service for LowResTimeService {
type Request = ();
type Response = Instant;
type Error = Void;
type Error = Infallible;
type Future = FutureResult<Self::Response, Self::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {