//! Service that applies a timeout to requests. //! //! If the response does not complete within the specified timeout, the response //! will be aborted. use std::fmt; use std::time::Duration; use actix_service::{NewTransform, Service, Transform}; use futures::future::{ok, FutureResult}; use futures::{Async, Future, Poll}; use tokio_timer::{clock, Delay}; /// Applies a timeout to requests. #[derive(Debug, Clone)] pub struct Timeout { timeout: Duration, } /// Timeout error pub enum TimeoutError { /// Service error Service(E), /// Service call timeout Timeout, } impl From for TimeoutError { fn from(err: E) -> Self { TimeoutError::Service(err) } } impl fmt::Debug for TimeoutError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { TimeoutError::Service(e) => write!(f, "TimeoutError::Service({:?})", e), TimeoutError::Timeout => write!(f, "TimeoutError::Timeout"), } } } impl PartialEq for TimeoutError { fn eq(&self, other: &TimeoutError) -> bool { match self { TimeoutError::Service(e1) => match other { TimeoutError::Service(e2) => e1 == e2, TimeoutError::Timeout => false, }, TimeoutError::Timeout => match other { TimeoutError::Service(_) => false, TimeoutError::Timeout => true, }, } } } impl Timeout { pub fn new(timeout: Duration) -> Self { Timeout { timeout } } } impl NewTransform for Timeout where S: Service, { type Request = S::Request; type Response = S::Response; type Error = TimeoutError; type InitError = (); type Transform = TimeoutService; type Future = FutureResult; fn new_transform(&self) -> Self::Future { ok(TimeoutService { timeout: self.timeout, }) } } /// Applies a timeout to requests. #[derive(Debug, Clone)] pub struct TimeoutService { timeout: Duration, } impl TimeoutService { pub fn new(timeout: Duration) -> Self { TimeoutService { timeout } } } impl Transform for TimeoutService where S: Service, { type Request = S::Request; type Response = S::Response; type Error = TimeoutError; type Future = TimeoutServiceResponse; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } fn call(&mut self, request: S::Request, service: &mut S) -> Self::Future { TimeoutServiceResponse { fut: service.call(request), sleep: Delay::new(clock::now() + self.timeout), } } } /// `TimeoutService` response future #[derive(Debug)] pub struct TimeoutServiceResponse { fut: T::Future, sleep: Delay, } impl Future for TimeoutServiceResponse where T: Service, { type Item = T::Response; type Error = TimeoutError; fn poll(&mut self) -> Poll { // First, try polling the future match self.fut.poll() { Ok(Async::Ready(v)) => return Ok(Async::Ready(v)), Ok(Async::NotReady) => {} Err(e) => return Err(TimeoutError::Service(e)), } // Now check the sleep match self.sleep.poll() { Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::Ready(_)) => Err(TimeoutError::Timeout), Err(_) => Err(TimeoutError::Timeout), } } } #[cfg(test)] mod tests { use futures::future::lazy; use futures::{Async, Poll}; use std::time::Duration; use super::*; use actix_service::{Blank, BlankNewService, NewService, Service, ServiceExt}; struct SleepService(Duration); impl Service for SleepService { type Request = (); type Response = (); type Error = (); type Future = Box>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } fn call(&mut self, _: ()) -> Self::Future { Box::new(tokio_timer::sleep(self.0).map_err(|_| ())) } } #[test] fn test_success() { let resolution = Duration::from_millis(100); let wait_time = Duration::from_millis(50); let res = actix_rt::System::new("test").block_on(lazy(|| { let mut timeout = Blank::default() .apply(TimeoutService::new(resolution), SleepService(wait_time)); timeout.call(()) })); assert_eq!(res, Ok(())); } #[test] fn test_timeout() { let resolution = Duration::from_millis(100); let wait_time = Duration::from_millis(150); let res = actix_rt::System::new("test").block_on(lazy(|| { let mut timeout = Blank::default() .apply(TimeoutService::new(resolution), SleepService(wait_time)); timeout.call(()) })); assert_eq!(res, Err(TimeoutError::Timeout)); } #[test] fn test_timeout_newservice() { let resolution = Duration::from_millis(100); let wait_time = Duration::from_millis(150); let res = actix_rt::System::new("test").block_on(lazy(|| { let timeout = BlankNewService::default() .apply(Timeout::new(resolution), || Ok(SleepService(wait_time))); if let Async::Ready(mut to) = timeout.new_service().poll().unwrap() { to.call(()) } else { panic!() } })); assert_eq!(res, Err(TimeoutError::Timeout)); } }