use futures::{Async, Future, Poll}; use super::counter::{Counter, CounterGuard}; use super::service::{IntoNewService, IntoService, NewService, Service}; /// InFlight - new service for service that can limit number of in-flight /// async requests. /// /// Default number of in-flight requests is 15 pub struct InFlight { factory: T, max_inflight: usize, } impl InFlight where T: NewService, { pub fn new>(factory: F) -> Self { Self { factory: factory.into_new_service(), max_inflight: 15, } } /// Set max number of in-flight requests. /// /// By default max in-flight requests is 15. pub fn max_inflight(mut self, max: usize) -> Self { self.max_inflight = max; self } } impl NewService for InFlight where T: NewService, { type Request = T::Request; type Response = T::Response; type Error = T::Error; type InitError = T::InitError; type Service = InFlightService; type Future = InFlightResponseFuture; fn new_service(&self) -> Self::Future { InFlightResponseFuture { fut: self.factory.new_service(), max_inflight: self.max_inflight, } } } pub struct InFlightResponseFuture { fut: T::Future, max_inflight: usize, } impl Future for InFlightResponseFuture { type Item = InFlightService; type Error = T::InitError; fn poll(&mut self) -> Poll { Ok(Async::Ready(InFlightService::with_max_inflight( self.max_inflight, try_ready!(self.fut.poll()), ))) } } pub struct InFlightService { service: T, count: Counter, } impl InFlightService { pub fn new>(service: F) -> Self { Self { service: service.into_service(), count: Counter::new(15), } } pub fn with_max_inflight>(max: usize, service: F) -> Self { Self { service: service.into_service(), count: Counter::new(max), } } } impl Service for InFlightService { type Request = T::Request; type Response = T::Response; type Error = T::Error; type Future = InFlightServiceResponse; fn poll_ready(&mut self) -> Poll<(), Self::Error> { let res = self.service.poll_ready(); if res.is_ok() && !self.count.available() { return Ok(Async::NotReady); } res } fn call(&mut self, req: Self::Request) -> Self::Future { InFlightServiceResponse { fut: self.service.call(req), guard: self.count.get(), } } } #[doc(hidden)] pub struct InFlightServiceResponse { fut: T::Future, #[allow(dead_code)] guard: CounterGuard, } impl Future for InFlightServiceResponse { type Item = T::Response; type Error = T::Error; fn poll(&mut self) -> Poll { self.fut.poll() } }