mirror of
https://github.com/fafhrd91/actix-net
synced 2025-06-27 00:27:43 +02:00
generalize apply combinator with transform trait
This commit is contained in:
@ -1,5 +1,6 @@
|
||||
use actix_service::{IntoNewService, IntoService, NewService, Service};
|
||||
use futures::{try_ready, Async, Future, Poll};
|
||||
use actix_service::{NewTransform, Service, Transform};
|
||||
use futures::future::{ok, FutureResult};
|
||||
use futures::{Async, Future, Poll};
|
||||
|
||||
use super::counter::{Counter, CounterGuard};
|
||||
|
||||
@ -7,98 +8,48 @@ use super::counter::{Counter, CounterGuard};
|
||||
/// async requests.
|
||||
///
|
||||
/// Default number of in-flight requests is 15
|
||||
pub struct InFlight<T> {
|
||||
factory: T,
|
||||
pub struct InFlight {
|
||||
max_inflight: usize,
|
||||
}
|
||||
|
||||
impl<T> InFlight<T> {
|
||||
pub fn new<F>(factory: F) -> Self
|
||||
where
|
||||
T: NewService,
|
||||
F: IntoNewService<T>,
|
||||
{
|
||||
Self {
|
||||
factory: factory.into_new_service(),
|
||||
max_inflight: 15,
|
||||
}
|
||||
}
|
||||
|
||||
/// Set max number of in-flight requests.
|
||||
///
|
||||
/// By default max in-flight requests is 15.
|
||||
pub fn max_inflight(mut self, max: usize) -> Self {
|
||||
self.max_inflight = max;
|
||||
self
|
||||
impl InFlight {
|
||||
pub fn new(max: usize) -> Self {
|
||||
Self { max_inflight: max }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> NewService for InFlight<T>
|
||||
where
|
||||
T: NewService,
|
||||
{
|
||||
impl Default for InFlight {
|
||||
fn default() -> Self {
|
||||
Self::new(15)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Service> NewTransform<T> for InFlight {
|
||||
type Request = T::Request;
|
||||
type Response = T::Response;
|
||||
type Error = T::Error;
|
||||
type InitError = T::InitError;
|
||||
type Service = InFlightService<T::Service>;
|
||||
type Future = InFlightResponseFuture<T>;
|
||||
type InitError = ();
|
||||
type Transform = InFlightService;
|
||||
type Future = FutureResult<Self::Transform, Self::InitError>;
|
||||
|
||||
fn new_service(&self) -> Self::Future {
|
||||
InFlightResponseFuture {
|
||||
fut: self.factory.new_service(),
|
||||
max_inflight: self.max_inflight,
|
||||
}
|
||||
fn new_transform(&self) -> Self::Future {
|
||||
ok(InFlightService::new(self.max_inflight))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct InFlightResponseFuture<T: NewService> {
|
||||
fut: T::Future,
|
||||
max_inflight: usize,
|
||||
}
|
||||
|
||||
impl<T: NewService> Future for InFlightResponseFuture<T> {
|
||||
type Item = InFlightService<T::Service>;
|
||||
type Error = T::InitError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
Ok(Async::Ready(InFlightService::with_max_inflight(
|
||||
self.max_inflight,
|
||||
try_ready!(self.fut.poll()),
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct InFlightService<T> {
|
||||
service: T,
|
||||
pub struct InFlightService {
|
||||
count: Counter,
|
||||
}
|
||||
|
||||
impl<T> InFlightService<T> {
|
||||
pub fn new<F>(service: F) -> Self
|
||||
where
|
||||
T: Service,
|
||||
F: IntoService<T>,
|
||||
{
|
||||
impl InFlightService {
|
||||
pub fn new(max: usize) -> Self {
|
||||
Self {
|
||||
service: service.into_service(),
|
||||
count: Counter::new(15),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_max_inflight<F>(max: usize, service: F) -> Self
|
||||
where
|
||||
T: Service,
|
||||
F: IntoService<T>,
|
||||
{
|
||||
Self {
|
||||
service: service.into_service(),
|
||||
count: Counter::new(max),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Service for InFlightService<T>
|
||||
impl<T> Transform<T> for InFlightService
|
||||
where
|
||||
T: Service,
|
||||
{
|
||||
@ -108,17 +59,17 @@ where
|
||||
type Future = InFlightServiceResponse<T>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
let res = self.service.poll_ready()?;
|
||||
if res.is_ready() && !self.count.available() {
|
||||
if !self.count.available() {
|
||||
log::trace!("InFlight limit exceeded");
|
||||
return Ok(Async::NotReady);
|
||||
} else {
|
||||
return Ok(Async::Ready(()));
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
fn call(&mut self, req: T::Request) -> Self::Future {
|
||||
fn call(&mut self, req: T::Request, service: &mut T) -> Self::Future {
|
||||
InFlightServiceResponse {
|
||||
fut: self.service.call(req),
|
||||
fut: service.call(req),
|
||||
_guard: self.count.get(),
|
||||
}
|
||||
}
|
||||
@ -138,3 +89,73 @@ impl<T: Service> Future for InFlightServiceResponse<T> {
|
||||
self.fut.poll()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use futures::future::lazy;
|
||||
use futures::{Async, Poll};
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use super::*;
|
||||
use actix_service::{Blank, BlankNewService, NewService, Service, ServiceExt};
|
||||
|
||||
struct SleepService(Duration);
|
||||
|
||||
impl Service for SleepService {
|
||||
type Request = ();
|
||||
type Response = ();
|
||||
type Error = ();
|
||||
type Future = Box<Future<Item = (), Error = ()>>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, _: ()) -> Self::Future {
|
||||
Box::new(tokio_timer::sleep(self.0).map_err(|_| ()))
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_transform() {
|
||||
let wait_time = Duration::from_millis(50);
|
||||
let _ = actix_rt::System::new("test").block_on(lazy(|| {
|
||||
let mut srv = Blank::new().apply(InFlightService::new(1), SleepService(wait_time));
|
||||
assert_eq!(srv.poll_ready(), Ok(Async::Ready(())));
|
||||
|
||||
let mut res = srv.call(());
|
||||
let _ = res.poll();
|
||||
assert_eq!(srv.poll_ready(), Ok(Async::NotReady));
|
||||
|
||||
drop(res);
|
||||
assert_eq!(srv.poll_ready(), Ok(Async::Ready(())));
|
||||
|
||||
Ok::<_, ()>(())
|
||||
}));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_newtransform() {
|
||||
let wait_time = Duration::from_millis(50);
|
||||
let _ = actix_rt::System::new("test").block_on(lazy(|| {
|
||||
let srv =
|
||||
BlankNewService::new().apply(InFlight::new(1), || Ok(SleepService(wait_time)));
|
||||
|
||||
if let Async::Ready(mut srv) = srv.new_service().poll().unwrap() {
|
||||
assert_eq!(srv.poll_ready(), Ok(Async::Ready(())));
|
||||
|
||||
let mut res = srv.call(());
|
||||
let _ = res.poll();
|
||||
assert_eq!(srv.poll_ready(), Ok(Async::NotReady));
|
||||
|
||||
drop(res);
|
||||
assert_eq!(srv.poll_ready(), Ok(Async::Ready(())));
|
||||
} else {
|
||||
panic!()
|
||||
}
|
||||
|
||||
Ok::<_, ()>(())
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
@ -5,15 +5,14 @@
|
||||
use std::fmt;
|
||||
use std::time::Duration;
|
||||
|
||||
use actix_service::{NewService, Service};
|
||||
use futures::try_ready;
|
||||
use actix_service::{NewTransform, Service, Transform};
|
||||
use futures::future::{ok, FutureResult};
|
||||
use futures::{Async, Future, Poll};
|
||||
use tokio_timer::{clock, Delay};
|
||||
|
||||
/// Applies a timeout to requests.
|
||||
#[derive(Debug)]
|
||||
pub struct Timeout<T> {
|
||||
inner: T,
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Timeout {
|
||||
timeout: Duration,
|
||||
}
|
||||
|
||||
@ -25,6 +24,12 @@ pub enum TimeoutError<E> {
|
||||
Timeout,
|
||||
}
|
||||
|
||||
impl<E> From<E> for TimeoutError<E> {
|
||||
fn from(err: E) -> Self {
|
||||
TimeoutError::Service(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: fmt::Debug> fmt::Debug for TimeoutError<E> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self {
|
||||
@ -34,107 +39,73 @@ impl<E: fmt::Debug> fmt::Debug for TimeoutError<E> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Timeout<T> {
|
||||
pub fn new(timeout: Duration, inner: T) -> Self
|
||||
where
|
||||
T: NewService + Clone,
|
||||
{
|
||||
Timeout { inner, timeout }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Clone for Timeout<T>
|
||||
where
|
||||
T: Clone,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
Timeout {
|
||||
inner: self.inner.clone(),
|
||||
timeout: self.timeout,
|
||||
impl<E: PartialEq> PartialEq for TimeoutError<E> {
|
||||
fn eq(&self, other: &TimeoutError<E>) -> bool {
|
||||
match self {
|
||||
TimeoutError::Service(e1) => match other {
|
||||
TimeoutError::Service(e2) => e1 == e2,
|
||||
TimeoutError::Timeout => false,
|
||||
},
|
||||
TimeoutError::Timeout => match other {
|
||||
TimeoutError::Service(_) => false,
|
||||
TimeoutError::Timeout => true,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> NewService for Timeout<T>
|
||||
where
|
||||
T: NewService + Clone,
|
||||
{
|
||||
type Request = T::Request;
|
||||
type Response = T::Response;
|
||||
type Error = TimeoutError<T::Error>;
|
||||
type InitError = T::InitError;
|
||||
type Service = TimeoutService<T::Service>;
|
||||
type Future = TimeoutFut<T>;
|
||||
|
||||
fn new_service(&self) -> Self::Future {
|
||||
TimeoutFut {
|
||||
fut: self.inner.new_service(),
|
||||
timeout: self.timeout,
|
||||
}
|
||||
impl Timeout {
|
||||
pub fn new(timeout: Duration) -> Self {
|
||||
Timeout { timeout }
|
||||
}
|
||||
}
|
||||
|
||||
/// `Timeout` response future
|
||||
#[derive(Debug)]
|
||||
pub struct TimeoutFut<T: NewService> {
|
||||
fut: T::Future,
|
||||
timeout: Duration,
|
||||
}
|
||||
|
||||
impl<T> Future for TimeoutFut<T>
|
||||
impl<S> NewTransform<S> for Timeout
|
||||
where
|
||||
T: NewService,
|
||||
S: Service,
|
||||
{
|
||||
type Item = TimeoutService<T::Service>;
|
||||
type Error = T::InitError;
|
||||
type Request = S::Request;
|
||||
type Response = S::Response;
|
||||
type Error = TimeoutError<S::Error>;
|
||||
type InitError = ();
|
||||
type Transform = TimeoutService;
|
||||
type Future = FutureResult<Self::Transform, Self::InitError>;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
let service = try_ready!(self.fut.poll());
|
||||
Ok(Async::Ready(TimeoutService::new(self.timeout, service)))
|
||||
fn new_transform(&self) -> Self::Future {
|
||||
ok(TimeoutService {
|
||||
timeout: self.timeout,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Applies a timeout to requests.
|
||||
#[derive(Debug)]
|
||||
pub struct TimeoutService<T> {
|
||||
inner: T,
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TimeoutService {
|
||||
timeout: Duration,
|
||||
}
|
||||
|
||||
impl<T> TimeoutService<T> {
|
||||
pub fn new(timeout: Duration, inner: T) -> Self
|
||||
where
|
||||
T: Service,
|
||||
{
|
||||
TimeoutService { inner, timeout }
|
||||
impl TimeoutService {
|
||||
pub fn new(timeout: Duration) -> Self {
|
||||
TimeoutService { timeout }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Clone> Clone for TimeoutService<T> {
|
||||
fn clone(&self) -> Self {
|
||||
TimeoutService {
|
||||
inner: self.inner.clone(),
|
||||
timeout: self.timeout,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Service for TimeoutService<T>
|
||||
impl<S> Transform<S> for TimeoutService
|
||||
where
|
||||
T: Service,
|
||||
S: Service,
|
||||
{
|
||||
type Request = T::Request;
|
||||
type Response = T::Response;
|
||||
type Error = TimeoutError<T::Error>;
|
||||
type Future = TimeoutServiceResponse<T>;
|
||||
type Request = S::Request;
|
||||
type Response = S::Response;
|
||||
type Error = TimeoutError<S::Error>;
|
||||
type Future = TimeoutServiceResponse<S>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
self.inner.poll_ready().map_err(TimeoutError::Service)
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, request: T::Request) -> Self::Future {
|
||||
fn call(&mut self, request: S::Request, service: &mut S) -> Self::Future {
|
||||
TimeoutServiceResponse {
|
||||
fut: self.inner.call(request),
|
||||
fut: service.call(request),
|
||||
sleep: Delay::new(clock::now() + self.timeout),
|
||||
}
|
||||
}
|
||||
@ -170,3 +141,74 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use futures::future::lazy;
|
||||
use futures::{Async, Poll};
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use super::*;
|
||||
use actix_service::{Blank, BlankNewService, NewService, Service, ServiceExt};
|
||||
|
||||
struct SleepService(Duration);
|
||||
|
||||
impl Service for SleepService {
|
||||
type Request = ();
|
||||
type Response = ();
|
||||
type Error = ();
|
||||
type Future = Box<Future<Item = (), Error = ()>>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, _: ()) -> Self::Future {
|
||||
Box::new(tokio_timer::sleep(self.0).map_err(|_| ()))
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_success() {
|
||||
let resolution = Duration::from_millis(100);
|
||||
let wait_time = Duration::from_millis(50);
|
||||
|
||||
let res = actix_rt::System::new("test").block_on(lazy(|| {
|
||||
let mut timeout = Blank::default()
|
||||
.apply(TimeoutService::new(resolution), SleepService(wait_time));
|
||||
timeout.call(())
|
||||
}));
|
||||
assert_eq!(res, Ok(()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_timeout() {
|
||||
let resolution = Duration::from_millis(100);
|
||||
let wait_time = Duration::from_millis(150);
|
||||
|
||||
let res = actix_rt::System::new("test").block_on(lazy(|| {
|
||||
let mut timeout = Blank::default()
|
||||
.apply(TimeoutService::new(resolution), SleepService(wait_time));
|
||||
timeout.call(())
|
||||
}));
|
||||
assert_eq!(res, Err(TimeoutError::Timeout));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_timeout_newservice() {
|
||||
let resolution = Duration::from_millis(100);
|
||||
let wait_time = Duration::from_millis(150);
|
||||
|
||||
let res = actix_rt::System::new("test").block_on(lazy(|| {
|
||||
let timeout = BlankNewService::default()
|
||||
.apply(Timeout::new(resolution), || Ok(SleepService(wait_time)));
|
||||
if let Async::Ready(mut to) = timeout.new_service().poll().unwrap() {
|
||||
to.call(())
|
||||
} else {
|
||||
panic!()
|
||||
}
|
||||
}));
|
||||
assert_eq!(res, Err(TimeoutError::Timeout));
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user