2019-02-03 10:42:27 -08:00
|
|
|
use actix_service::{NewTransform, Service, Transform};
|
|
|
|
use futures::future::{ok, FutureResult};
|
|
|
|
use futures::{Async, Future, Poll};
|
2018-09-14 13:30:29 -07:00
|
|
|
|
|
|
|
use super::counter::{Counter, CounterGuard};
|
|
|
|
|
|
|
|
/// InFlight - new service for service that can limit number of in-flight
|
|
|
|
/// async requests.
|
|
|
|
///
|
|
|
|
/// Default number of in-flight requests is 15
|
2019-02-03 10:42:27 -08:00
|
|
|
pub struct InFlight {
|
2018-09-14 13:30:29 -07:00
|
|
|
max_inflight: usize,
|
|
|
|
}
|
|
|
|
|
2019-02-03 10:42:27 -08:00
|
|
|
impl InFlight {
|
|
|
|
pub fn new(max: usize) -> Self {
|
|
|
|
Self { max_inflight: max }
|
2018-09-14 13:30:29 -07:00
|
|
|
}
|
2019-02-03 10:42:27 -08:00
|
|
|
}
|
2018-09-14 13:30:29 -07:00
|
|
|
|
2019-02-03 10:42:27 -08:00
|
|
|
impl Default for InFlight {
|
|
|
|
fn default() -> Self {
|
|
|
|
Self::new(15)
|
2018-09-14 13:30:29 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-03 10:42:27 -08:00
|
|
|
impl<T: Service> NewTransform<T> for InFlight {
|
2019-02-01 19:53:13 -08:00
|
|
|
type Request = T::Request;
|
2018-09-14 13:30:29 -07:00
|
|
|
type Response = T::Response;
|
|
|
|
type Error = T::Error;
|
2019-02-03 10:42:27 -08:00
|
|
|
type InitError = ();
|
|
|
|
type Transform = InFlightService;
|
|
|
|
type Future = FutureResult<Self::Transform, Self::InitError>;
|
2018-09-14 13:30:29 -07:00
|
|
|
|
2019-02-03 10:42:27 -08:00
|
|
|
fn new_transform(&self) -> Self::Future {
|
|
|
|
ok(InFlightService::new(self.max_inflight))
|
2018-09-14 13:30:29 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-03 10:42:27 -08:00
|
|
|
pub struct InFlightService {
|
2018-09-14 13:30:29 -07:00
|
|
|
count: Counter,
|
|
|
|
}
|
|
|
|
|
2019-02-03 10:42:27 -08:00
|
|
|
impl InFlightService {
|
|
|
|
pub fn new(max: usize) -> Self {
|
2018-09-14 13:30:29 -07:00
|
|
|
Self {
|
|
|
|
count: Counter::new(max),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-03 10:42:27 -08:00
|
|
|
impl<T> Transform<T> for InFlightService
|
2018-11-29 16:56:15 -10:00
|
|
|
where
|
2019-02-01 19:53:13 -08:00
|
|
|
T: Service,
|
2018-11-29 16:56:15 -10:00
|
|
|
{
|
2019-02-01 19:53:13 -08:00
|
|
|
type Request = T::Request;
|
2018-09-14 13:30:29 -07:00
|
|
|
type Response = T::Response;
|
|
|
|
type Error = T::Error;
|
2019-02-01 19:53:13 -08:00
|
|
|
type Future = InFlightServiceResponse<T>;
|
2018-09-14 13:30:29 -07:00
|
|
|
|
|
|
|
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
2019-02-03 10:42:27 -08:00
|
|
|
if !self.count.available() {
|
2019-02-01 15:15:53 -08:00
|
|
|
log::trace!("InFlight limit exceeded");
|
2018-09-14 13:30:29 -07:00
|
|
|
return Ok(Async::NotReady);
|
2019-02-03 10:42:27 -08:00
|
|
|
} else {
|
|
|
|
return Ok(Async::Ready(()));
|
2018-09-14 13:30:29 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-03 10:42:27 -08:00
|
|
|
fn call(&mut self, req: T::Request, service: &mut T) -> Self::Future {
|
2018-09-14 13:30:29 -07:00
|
|
|
InFlightServiceResponse {
|
2019-02-03 10:42:27 -08:00
|
|
|
fut: service.call(req),
|
2018-11-29 16:56:15 -10:00
|
|
|
_guard: self.count.get(),
|
2018-09-14 13:30:29 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[doc(hidden)]
|
2019-02-01 19:53:13 -08:00
|
|
|
pub struct InFlightServiceResponse<T: Service> {
|
2018-09-14 13:30:29 -07:00
|
|
|
fut: T::Future,
|
2018-11-29 16:56:15 -10:00
|
|
|
_guard: CounterGuard,
|
2018-09-14 13:30:29 -07:00
|
|
|
}
|
|
|
|
|
2019-02-01 19:53:13 -08:00
|
|
|
impl<T: Service> Future for InFlightServiceResponse<T> {
|
2018-09-14 13:30:29 -07:00
|
|
|
type Item = T::Response;
|
|
|
|
type Error = T::Error;
|
|
|
|
|
|
|
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
|
|
|
self.fut.poll()
|
|
|
|
}
|
|
|
|
}
|
2019-02-03 10:42:27 -08:00
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
use futures::future::lazy;
|
|
|
|
use futures::{Async, Poll};
|
|
|
|
|
|
|
|
use std::time::Duration;
|
|
|
|
|
|
|
|
use super::*;
|
|
|
|
use actix_service::{Blank, BlankNewService, NewService, Service, ServiceExt};
|
|
|
|
|
|
|
|
struct SleepService(Duration);
|
|
|
|
|
|
|
|
impl Service for SleepService {
|
|
|
|
type Request = ();
|
|
|
|
type Response = ();
|
|
|
|
type Error = ();
|
|
|
|
type Future = Box<Future<Item = (), Error = ()>>;
|
|
|
|
|
|
|
|
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
|
|
|
Ok(Async::Ready(()))
|
|
|
|
}
|
|
|
|
|
|
|
|
fn call(&mut self, _: ()) -> Self::Future {
|
|
|
|
Box::new(tokio_timer::sleep(self.0).map_err(|_| ()))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_transform() {
|
|
|
|
let wait_time = Duration::from_millis(50);
|
|
|
|
let _ = actix_rt::System::new("test").block_on(lazy(|| {
|
|
|
|
let mut srv = Blank::new().apply(InFlightService::new(1), SleepService(wait_time));
|
|
|
|
assert_eq!(srv.poll_ready(), Ok(Async::Ready(())));
|
|
|
|
|
|
|
|
let mut res = srv.call(());
|
|
|
|
let _ = res.poll();
|
|
|
|
assert_eq!(srv.poll_ready(), Ok(Async::NotReady));
|
|
|
|
|
|
|
|
drop(res);
|
|
|
|
assert_eq!(srv.poll_ready(), Ok(Async::Ready(())));
|
|
|
|
|
|
|
|
Ok::<_, ()>(())
|
|
|
|
}));
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_newtransform() {
|
|
|
|
let wait_time = Duration::from_millis(50);
|
|
|
|
let _ = actix_rt::System::new("test").block_on(lazy(|| {
|
|
|
|
let srv =
|
|
|
|
BlankNewService::new().apply(InFlight::new(1), || Ok(SleepService(wait_time)));
|
|
|
|
|
|
|
|
if let Async::Ready(mut srv) = srv.new_service().poll().unwrap() {
|
|
|
|
assert_eq!(srv.poll_ready(), Ok(Async::Ready(())));
|
|
|
|
|
|
|
|
let mut res = srv.call(());
|
|
|
|
let _ = res.poll();
|
|
|
|
assert_eq!(srv.poll_ready(), Ok(Async::NotReady));
|
|
|
|
|
|
|
|
drop(res);
|
|
|
|
assert_eq!(srv.poll_ready(), Ok(Async::Ready(())));
|
|
|
|
} else {
|
|
|
|
panic!()
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok::<_, ()>(())
|
|
|
|
}));
|
|
|
|
}
|
|
|
|
}
|