diff --git a/src/inflight.rs b/src/inflight.rs new file mode 100644 index 00000000..6ee82a8c --- /dev/null +++ b/src/inflight.rs @@ -0,0 +1,128 @@ +use futures::{Async, Future, Poll}; + +use super::counter::{Counter, CounterGuard}; +use super::service::{IntoNewService, IntoService, NewService, Service}; + +/// InFlight - new service for service that can limit number of in-flight +/// async requests. +/// +/// Default number of in-flight requests is 15 +pub struct InFlight { + factory: T, + max_inflight: usize, +} + +impl InFlight +where + T: NewService, +{ + pub fn new>(factory: F) -> Self { + Self { + factory: factory.into_new_service(), + max_inflight: 15, + } + } + + /// Set max number of in-flight requests. + /// + /// By default max in-flight requests is 15. + pub fn max_inflight(mut self, max: usize) -> Self { + self.max_inflight = max; + self + } +} + +impl NewService for InFlight +where + T: NewService, +{ + type Request = T::Request; + type Response = T::Response; + type Error = T::Error; + type InitError = T::InitError; + type Service = InFlightService; + type Future = InFlightResponseFuture; + + fn new_service(&self) -> Self::Future { + InFlightResponseFuture { + fut: self.factory.new_service(), + max_inflight: self.max_inflight, + } + } +} + +pub struct InFlightResponseFuture { + fut: T::Future, + max_inflight: usize, +} + +impl Future for InFlightResponseFuture { + type Item = InFlightService; + type Error = T::InitError; + + fn poll(&mut self) -> Poll { + Ok(Async::Ready(InFlightService::with_max_inflight( + self.max_inflight, + try_ready!(self.fut.poll()), + ))) + } +} + +pub struct InFlightService { + service: T, + count: Counter, +} + +impl InFlightService { + pub fn new>(service: F) -> Self { + Self { + service: service.into_service(), + count: Counter::new(15), + } + } + + pub fn with_max_inflight>(max: usize, service: F) -> Self { + Self { + service: service.into_service(), + count: Counter::new(max), + } + } +} + +impl Service for InFlightService { + type Request = T::Request; + type Response = T::Response; + type Error = T::Error; + type Future = InFlightServiceResponse; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + let res = self.service.poll_ready(); + if res.is_ok() && !self.count.available() { + return Ok(Async::NotReady); + } + res + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + InFlightServiceResponse { + fut: self.service.call(req), + guard: self.count.get(), + } + } +} + +#[doc(hidden)] +pub struct InFlightServiceResponse { + fut: T::Future, + #[allow(dead_code)] + guard: CounterGuard, +} + +impl Future for InFlightServiceResponse { + type Item = T::Response; + type Error = T::Error; + + fn poll(&mut self) -> Poll { + self.fut.poll() + } +} diff --git a/src/lib.rs b/src/lib.rs index 446222f7..8a82a0b9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,6 +58,7 @@ extern crate webpki_roots; pub mod connector; pub mod counter; pub mod framed; +pub mod inflight; pub mod resolver; pub mod server; pub mod service;