mirror of
https://github.com/fafhrd91/actix-net
synced 2025-06-27 00:27:43 +02:00
simplify transform trait
This commit is contained in:
@ -1,4 +1,4 @@
|
||||
use actix_service::{NewTransform, Service, Transform, Void};
|
||||
use actix_service::{Service, Transform, Void};
|
||||
use futures::future::{ok, FutureResult};
|
||||
use futures::{Async, Future, Poll};
|
||||
|
||||
@ -24,32 +24,34 @@ impl Default for InFlight {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Service, C> NewTransform<T, C> for InFlight {
|
||||
type Request = T::Request;
|
||||
type Response = T::Response;
|
||||
type Error = T::Error;
|
||||
impl<S: Service> Transform<S> for InFlight {
|
||||
type Request = S::Request;
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type InitError = Void;
|
||||
type Transform = InFlightService;
|
||||
type Transform = InFlightService<S>;
|
||||
type Future = FutureResult<Self::Transform, Self::InitError>;
|
||||
|
||||
fn new_transform(&self, _: &C) -> Self::Future {
|
||||
ok(InFlightService::new(self.max_inflight))
|
||||
fn new_transform(&self, service: S) -> Self::Future {
|
||||
ok(InFlightService::new(self.max_inflight, service))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct InFlightService {
|
||||
pub struct InFlightService<S> {
|
||||
count: Counter,
|
||||
service: S,
|
||||
}
|
||||
|
||||
impl InFlightService {
|
||||
pub fn new(max: usize) -> Self {
|
||||
impl<S> InFlightService<S> {
|
||||
pub fn new(max: usize, service: S) -> Self {
|
||||
Self {
|
||||
service,
|
||||
count: Counter::new(max),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Transform<T> for InFlightService
|
||||
impl<T> Service for InFlightService<T>
|
||||
where
|
||||
T: Service,
|
||||
{
|
||||
@ -59,6 +61,8 @@ where
|
||||
type Future = InFlightServiceResponse<T>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
self.service.poll_ready()?;
|
||||
|
||||
if !self.count.available() {
|
||||
log::trace!("InFlight limit exceeded");
|
||||
Ok(Async::NotReady)
|
||||
@ -67,9 +71,9 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn call(&mut self, req: T::Request, service: &mut T) -> Self::Future {
|
||||
fn call(&mut self, req: T::Request) -> Self::Future {
|
||||
InFlightServiceResponse {
|
||||
fut: service.call(req),
|
||||
fut: self.service.call(req),
|
||||
_guard: self.count.get(),
|
||||
}
|
||||
}
|
||||
@ -122,7 +126,8 @@ mod tests {
|
||||
fn test_transform() {
|
||||
let wait_time = Duration::from_millis(50);
|
||||
let _ = actix_rt::System::new("test").block_on(lazy(|| {
|
||||
let mut srv = Blank::new().apply(InFlightService::new(1), SleepService(wait_time));
|
||||
let mut srv =
|
||||
Blank::new().and_then(InFlightService::new(1, SleepService(wait_time)));
|
||||
assert_eq!(srv.poll_ready(), Ok(Async::Ready(())));
|
||||
|
||||
let mut res = srv.call(());
|
||||
|
@ -3,7 +3,7 @@ use std::fmt;
|
||||
use std::marker::PhantomData;
|
||||
use std::rc::Rc;
|
||||
|
||||
use actix_service::{NewTransform, Service, Transform, Void};
|
||||
use actix_service::{Service, Transform, Void};
|
||||
use futures::future::{ok, FutureResult};
|
||||
use futures::task::AtomicTask;
|
||||
use futures::unsync::oneshot;
|
||||
@ -63,13 +63,8 @@ where
|
||||
Self { _t: PhantomData }
|
||||
}
|
||||
|
||||
pub fn service() -> impl Transform<
|
||||
S,
|
||||
Request = S::Request,
|
||||
Response = S::Response,
|
||||
Error = InOrderError<S::Error>,
|
||||
> {
|
||||
InOrderService::new()
|
||||
pub fn service(service: S) -> InOrderService<S> {
|
||||
InOrderService::new(service)
|
||||
}
|
||||
}
|
||||
|
||||
@ -85,7 +80,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, C> NewTransform<S, C> for InOrder<S>
|
||||
impl<S> Transform<S> for InOrder<S>
|
||||
where
|
||||
S: Service,
|
||||
S::Response: 'static,
|
||||
@ -99,12 +94,13 @@ where
|
||||
type Transform = InOrderService<S>;
|
||||
type Future = FutureResult<Self::Transform, Self::InitError>;
|
||||
|
||||
fn new_transform(&self, _: &C) -> Self::Future {
|
||||
ok(InOrderService::new())
|
||||
fn new_transform(&self, service: S) -> Self::Future {
|
||||
ok(InOrderService::new(service))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct InOrderService<S: Service> {
|
||||
service: S,
|
||||
task: Rc<AtomicTask>,
|
||||
acks: VecDeque<Record<S::Response, S::Error>>,
|
||||
}
|
||||
@ -116,27 +112,16 @@ where
|
||||
S::Future: 'static,
|
||||
S::Error: 'static,
|
||||
{
|
||||
pub fn new() -> Self {
|
||||
pub fn new(service: S) -> Self {
|
||||
Self {
|
||||
service,
|
||||
acks: VecDeque::new(),
|
||||
task: Rc::new(AtomicTask::new()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Default for InOrderService<S>
|
||||
where
|
||||
S: Service,
|
||||
S::Response: 'static,
|
||||
S::Future: 'static,
|
||||
S::Error: 'static,
|
||||
{
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Transform<S> for InOrderService<S>
|
||||
impl<S> Service for InOrderService<S>
|
||||
where
|
||||
S: Service,
|
||||
S::Response: 'static,
|
||||
@ -149,8 +134,12 @@ where
|
||||
type Future = InOrderServiceResponse<S>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
// poll_ready could be called from different task
|
||||
self.task.register();
|
||||
|
||||
// check nested service
|
||||
self.service.poll_ready().map_err(InOrderError::Service)?;
|
||||
|
||||
// check acks
|
||||
while !self.acks.is_empty() {
|
||||
let rec = self.acks.front_mut().unwrap();
|
||||
@ -167,13 +156,13 @@ where
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, request: S::Request, service: &mut S) -> Self::Future {
|
||||
fn call(&mut self, request: S::Request) -> Self::Future {
|
||||
let (tx1, rx1) = oneshot::channel();
|
||||
let (tx2, rx2) = oneshot::channel();
|
||||
self.acks.push_back(Record { rx: rx1, tx: tx2 });
|
||||
|
||||
let task = self.task.clone();
|
||||
tokio_current_thread::spawn(service.call(request).then(move |res| {
|
||||
tokio_current_thread::spawn(self.service.call(request).then(move |res| {
|
||||
task.notify();
|
||||
let _ = tx1.send(res);
|
||||
Ok(())
|
||||
@ -257,7 +246,7 @@ mod tests {
|
||||
let rx3 = rx3;
|
||||
let tx_stop = tx_stop;
|
||||
let _ = actix_rt::System::new("test").block_on(lazy(move || {
|
||||
let mut srv = Blank::new().apply(InOrderService::new(), Srv);
|
||||
let mut srv = Blank::new().and_then(InOrderService::new(Srv));
|
||||
|
||||
let res1 = srv.call(rx1);
|
||||
let res2 = srv.call(rx2);
|
||||
|
@ -6,7 +6,7 @@ use std::fmt;
|
||||
use std::marker::PhantomData;
|
||||
use std::time::Duration;
|
||||
|
||||
use actix_service::{NewTransform, Service, Transform};
|
||||
use actix_service::{Service, Transform};
|
||||
use futures::future::{ok, FutureResult};
|
||||
use futures::{Async, Future, Poll};
|
||||
use tokio_timer::{clock, Delay};
|
||||
@ -80,7 +80,7 @@ impl<E> Clone for Timeout<E> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, C, E> NewTransform<S, C> for Timeout<E>
|
||||
impl<S, E> Transform<S> for Timeout<E>
|
||||
where
|
||||
S: Service,
|
||||
{
|
||||
@ -88,11 +88,12 @@ where
|
||||
type Response = S::Response;
|
||||
type Error = TimeoutError<S::Error>;
|
||||
type InitError = E;
|
||||
type Transform = TimeoutService;
|
||||
type Transform = TimeoutService<S>;
|
||||
type Future = FutureResult<Self::Transform, Self::InitError>;
|
||||
|
||||
fn new_transform(&self, _: &C) -> Self::Future {
|
||||
fn new_transform(&self, service: S) -> Self::Future {
|
||||
ok(TimeoutService {
|
||||
service,
|
||||
timeout: self.timeout,
|
||||
})
|
||||
}
|
||||
@ -100,17 +101,18 @@ where
|
||||
|
||||
/// Applies a timeout to requests.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TimeoutService {
|
||||
pub struct TimeoutService<S> {
|
||||
service: S,
|
||||
timeout: Duration,
|
||||
}
|
||||
|
||||
impl TimeoutService {
|
||||
pub fn new(timeout: Duration) -> Self {
|
||||
TimeoutService { timeout }
|
||||
impl<S> TimeoutService<S> {
|
||||
pub fn new(timeout: Duration, service: S) -> Self {
|
||||
TimeoutService { service, timeout }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Transform<S> for TimeoutService
|
||||
impl<S> Service for TimeoutService<S>
|
||||
where
|
||||
S: Service,
|
||||
{
|
||||
@ -120,12 +122,12 @@ where
|
||||
type Future = TimeoutServiceResponse<S>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Ok(Async::Ready(()))
|
||||
self.service.poll_ready().map_err(TimeoutError::Service)
|
||||
}
|
||||
|
||||
fn call(&mut self, request: S::Request, service: &mut S) -> Self::Future {
|
||||
fn call(&mut self, request: S::Request) -> Self::Future {
|
||||
TimeoutServiceResponse {
|
||||
fut: service.call(request),
|
||||
fut: self.service.call(request),
|
||||
sleep: Delay::new(clock::now() + self.timeout),
|
||||
}
|
||||
}
|
||||
@ -197,7 +199,7 @@ mod tests {
|
||||
|
||||
let res = actix_rt::System::new("test").block_on(lazy(|| {
|
||||
let mut timeout = Blank::default()
|
||||
.apply(TimeoutService::new(resolution), SleepService(wait_time));
|
||||
.and_then(TimeoutService::new(resolution, SleepService(wait_time)));
|
||||
timeout.call(())
|
||||
}));
|
||||
assert_eq!(res, Ok(()));
|
||||
@ -210,7 +212,7 @@ mod tests {
|
||||
|
||||
let res = actix_rt::System::new("test").block_on(lazy(|| {
|
||||
let mut timeout = Blank::default()
|
||||
.apply(TimeoutService::new(resolution), SleepService(wait_time));
|
||||
.and_then(TimeoutService::new(resolution, SleepService(wait_time)));
|
||||
timeout.call(())
|
||||
}));
|
||||
assert_eq!(res, Err(TimeoutError::Timeout));
|
||||
|
Reference in New Issue
Block a user