//! Service that applies a timeout to requests. //! //! If the response does not complete within the specified timeout, the response //! will be aborted. use std::fmt; use std::time::Duration; use actix_service::{NewService, Service}; use futures::try_ready; use futures::{Async, Future, Poll}; use tokio_timer::{clock, Delay}; /// Applies a timeout to requests. #[derive(Debug)] pub struct Timeout { inner: T, timeout: Duration, } /// Timeout error pub enum TimeoutError { /// Service error Service(E), /// Service call timeout Timeout, } impl fmt::Debug for TimeoutError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { TimeoutError::Service(e) => write!(f, "TimeoutError::Service({:?})", e), TimeoutError::Timeout => write!(f, "TimeoutError::Timeout"), } } } impl Timeout { pub fn new(timeout: Duration, inner: T) -> Self where T: NewService + Clone, { Timeout { inner, timeout } } } impl NewService for Timeout where T: NewService + Clone, { type Response = T::Response; type Error = TimeoutError; type InitError = T::InitError; type Service = TimeoutService; type Future = TimeoutFut; fn new_service(&self) -> Self::Future { TimeoutFut { fut: self.inner.new_service(), timeout: self.timeout, } } } /// `Timeout` response future #[derive(Debug)] pub struct TimeoutFut, Request> { fut: T::Future, timeout: Duration, } impl Future for TimeoutFut where T: NewService, { type Item = TimeoutService; type Error = T::InitError; fn poll(&mut self) -> Poll { let service = try_ready!(self.fut.poll()); Ok(Async::Ready(TimeoutService::new(self.timeout, service))) } } /// Applies a timeout to requests. #[derive(Debug)] pub struct TimeoutService { inner: T, timeout: Duration, } impl TimeoutService { pub fn new(timeout: Duration, inner: T) -> Self where T: Service, { TimeoutService { inner, timeout } } } impl Clone for TimeoutService { fn clone(&self) -> Self { TimeoutService { inner: self.inner.clone(), timeout: self.timeout, } } } impl Service for TimeoutService where T: Service, { type Response = T::Response; type Error = TimeoutError; type Future = TimeoutServiceResponse; fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.inner.poll_ready().map_err(TimeoutError::Service) } fn call(&mut self, request: Request) -> Self::Future { TimeoutServiceResponse { fut: self.inner.call(request), sleep: Delay::new(clock::now() + self.timeout), } } } /// `TimeoutService` response future #[derive(Debug)] pub struct TimeoutServiceResponse, Request> { fut: T::Future, sleep: Delay, } impl Future for TimeoutServiceResponse where T: Service, { type Item = T::Response; type Error = TimeoutError; fn poll(&mut self) -> Poll { // First, try polling the future match self.fut.poll() { Ok(Async::Ready(v)) => return Ok(Async::Ready(v)), Ok(Async::NotReady) => {} Err(e) => return Err(TimeoutError::Service(e)), } // Now check the sleep match self.sleep.poll() { Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::Ready(_)) => Err(TimeoutError::Timeout), Err(_) => Err(TimeoutError::Timeout), } } }