2018-10-29 20:29:47 -07:00
|
|
|
//! Service that applies a timeout to requests.
|
2018-10-23 21:38:36 -07:00
|
|
|
//!
|
|
|
|
//! If the response does not complete within the specified timeout, the response
|
|
|
|
//! will be aborted.
|
2019-11-14 18:38:24 +06:00
|
|
|
use std::future::Future;
|
2019-02-03 11:33:26 -08:00
|
|
|
use std::marker::PhantomData;
|
2019-11-14 18:38:24 +06:00
|
|
|
use std::pin::Pin;
|
|
|
|
use std::task::{Context, Poll};
|
|
|
|
use std::{fmt, time};
|
2018-10-23 21:38:36 -07:00
|
|
|
|
2019-03-12 13:39:04 -07:00
|
|
|
use actix_service::{IntoService, Service, Transform};
|
2019-11-14 18:38:24 +06:00
|
|
|
use futures::future::{ok, Ready};
|
|
|
|
use tokio_timer::{clock, delay, Delay};
|
2018-10-23 21:38:36 -07:00
|
|
|
|
|
|
|
/// Applies a timeout to requests.
|
2019-02-03 11:37:34 -08:00
|
|
|
#[derive(Debug)]
|
2019-02-03 11:33:26 -08:00
|
|
|
pub struct Timeout<E = ()> {
|
2019-11-14 18:38:24 +06:00
|
|
|
timeout: time::Duration,
|
2019-02-03 11:33:26 -08:00
|
|
|
_t: PhantomData<E>,
|
2018-10-23 21:38:36 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Timeout error
|
|
|
|
pub enum TimeoutError<E> {
|
|
|
|
/// Service error
|
|
|
|
Service(E),
|
|
|
|
/// Service call timeout
|
|
|
|
Timeout,
|
|
|
|
}
|
|
|
|
|
2019-02-03 10:42:27 -08:00
|
|
|
impl<E> From<E> for TimeoutError<E> {
|
|
|
|
fn from(err: E) -> Self {
|
|
|
|
TimeoutError::Service(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-10-23 21:38:36 -07:00
|
|
|
impl<E: fmt::Debug> fmt::Debug for TimeoutError<E> {
|
|
|
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
|
|
match self {
|
|
|
|
TimeoutError::Service(e) => write!(f, "TimeoutError::Service({:?})", e),
|
|
|
|
TimeoutError::Timeout => write!(f, "TimeoutError::Timeout"),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-11 08:34:57 -08:00
|
|
|
impl<E: fmt::Display> fmt::Display for TimeoutError<E> {
|
|
|
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
|
|
match self {
|
|
|
|
TimeoutError::Service(e) => e.fmt(f),
|
|
|
|
TimeoutError::Timeout => write!(f, "Service call timeout"),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-03 10:42:27 -08:00
|
|
|
impl<E: PartialEq> PartialEq for TimeoutError<E> {
|
|
|
|
fn eq(&self, other: &TimeoutError<E>) -> bool {
|
|
|
|
match self {
|
|
|
|
TimeoutError::Service(e1) => match other {
|
|
|
|
TimeoutError::Service(e2) => e1 == e2,
|
|
|
|
TimeoutError::Timeout => false,
|
|
|
|
},
|
|
|
|
TimeoutError::Timeout => match other {
|
|
|
|
TimeoutError::Service(_) => false,
|
|
|
|
TimeoutError::Timeout => true,
|
|
|
|
},
|
2019-01-25 14:31:27 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-03 11:33:26 -08:00
|
|
|
impl<E> Timeout<E> {
|
2019-11-14 18:38:24 +06:00
|
|
|
pub fn new(timeout: time::Duration) -> Self {
|
2019-02-03 11:33:26 -08:00
|
|
|
Timeout {
|
|
|
|
timeout,
|
|
|
|
_t: PhantomData,
|
|
|
|
}
|
2018-10-23 21:38:36 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-03 11:37:34 -08:00
|
|
|
impl<E> Clone for Timeout<E> {
|
|
|
|
fn clone(&self) -> Self {
|
|
|
|
Timeout::new(self.timeout)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-03-09 07:27:35 -08:00
|
|
|
impl<S, E> Transform<S> for Timeout<E>
|
2018-10-23 21:38:36 -07:00
|
|
|
where
|
2019-03-09 07:27:35 -08:00
|
|
|
S: Service,
|
2018-10-23 21:38:36 -07:00
|
|
|
{
|
2019-03-09 07:27:35 -08:00
|
|
|
type Request = S::Request;
|
2019-02-03 10:42:27 -08:00
|
|
|
type Response = S::Response;
|
|
|
|
type Error = TimeoutError<S::Error>;
|
2019-02-03 11:33:26 -08:00
|
|
|
type InitError = E;
|
2019-03-04 19:38:11 -08:00
|
|
|
type Transform = TimeoutService<S>;
|
2019-11-14 18:38:24 +06:00
|
|
|
type Future = Ready<Result<Self::Transform, Self::InitError>>;
|
2019-02-03 10:42:27 -08:00
|
|
|
|
2019-03-04 19:38:11 -08:00
|
|
|
fn new_transform(&self, service: S) -> Self::Future {
|
2019-02-03 10:42:27 -08:00
|
|
|
ok(TimeoutService {
|
2019-03-04 19:38:11 -08:00
|
|
|
service,
|
2019-02-03 10:42:27 -08:00
|
|
|
timeout: self.timeout,
|
|
|
|
})
|
2018-10-23 21:38:36 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Applies a timeout to requests.
|
2019-02-03 10:42:27 -08:00
|
|
|
#[derive(Debug, Clone)]
|
2019-03-04 19:38:11 -08:00
|
|
|
pub struct TimeoutService<S> {
|
|
|
|
service: S,
|
2019-11-14 18:38:24 +06:00
|
|
|
timeout: time::Duration,
|
2018-10-23 21:38:36 -07:00
|
|
|
}
|
|
|
|
|
2019-03-12 13:39:04 -07:00
|
|
|
impl<S> TimeoutService<S>
|
|
|
|
where
|
|
|
|
S: Service,
|
|
|
|
{
|
2019-11-14 18:38:24 +06:00
|
|
|
pub fn new<U>(timeout: time::Duration, service: U) -> Self
|
2019-03-12 13:39:04 -07:00
|
|
|
where
|
|
|
|
U: IntoService<S>,
|
|
|
|
{
|
|
|
|
TimeoutService {
|
|
|
|
timeout,
|
|
|
|
service: service.into_service(),
|
|
|
|
}
|
2018-10-23 21:38:36 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-03-09 07:27:35 -08:00
|
|
|
impl<S> Service for TimeoutService<S>
|
2018-10-23 21:38:36 -07:00
|
|
|
where
|
2019-03-09 07:27:35 -08:00
|
|
|
S: Service,
|
2018-10-23 21:38:36 -07:00
|
|
|
{
|
2019-03-09 07:27:35 -08:00
|
|
|
type Request = S::Request;
|
2019-02-03 10:42:27 -08:00
|
|
|
type Response = S::Response;
|
|
|
|
type Error = TimeoutError<S::Error>;
|
2019-03-09 07:27:35 -08:00
|
|
|
type Future = TimeoutServiceResponse<S>;
|
2018-10-23 21:38:36 -07:00
|
|
|
|
2019-11-14 18:38:24 +06:00
|
|
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
|
|
|
self.service.poll_ready(cx).map_err(TimeoutError::Service)
|
2018-10-23 21:38:36 -07:00
|
|
|
}
|
|
|
|
|
2019-03-09 07:27:35 -08:00
|
|
|
fn call(&mut self, request: S::Request) -> Self::Future {
|
2018-10-23 21:38:36 -07:00
|
|
|
TimeoutServiceResponse {
|
2019-03-04 19:38:11 -08:00
|
|
|
fut: self.service.call(request),
|
2019-11-14 18:38:24 +06:00
|
|
|
sleep: delay(clock::now() + self.timeout),
|
2018-10-23 21:38:36 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// `TimeoutService` response future
|
2019-11-19 14:51:40 +06:00
|
|
|
#[pin_project::pin_project]
|
2018-10-23 21:38:36 -07:00
|
|
|
#[derive(Debug)]
|
2019-03-09 07:27:35 -08:00
|
|
|
pub struct TimeoutServiceResponse<T: Service> {
|
2019-11-19 14:51:40 +06:00
|
|
|
#[pin]
|
2018-10-23 21:38:36 -07:00
|
|
|
fut: T::Future,
|
|
|
|
sleep: Delay,
|
|
|
|
}
|
|
|
|
|
2019-03-09 07:27:35 -08:00
|
|
|
impl<T> Future for TimeoutServiceResponse<T>
|
2018-10-23 21:38:36 -07:00
|
|
|
where
|
2019-03-09 07:27:35 -08:00
|
|
|
T: Service,
|
2018-10-23 21:38:36 -07:00
|
|
|
{
|
2019-11-14 18:38:24 +06:00
|
|
|
type Output = Result<T::Response, TimeoutError<T::Error>>;
|
|
|
|
|
|
|
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
2019-11-19 14:51:40 +06:00
|
|
|
let mut this = self.project();
|
2018-10-23 21:38:36 -07:00
|
|
|
|
|
|
|
// First, try polling the future
|
2019-11-19 14:51:40 +06:00
|
|
|
match this.fut.poll(cx) {
|
2019-11-14 18:38:24 +06:00
|
|
|
Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)),
|
|
|
|
Poll::Ready(Err(e)) => return Poll::Ready(Err(TimeoutError::Service(e))),
|
|
|
|
Poll::Pending => {}
|
2018-10-23 21:38:36 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// Now check the sleep
|
2019-11-14 18:38:24 +06:00
|
|
|
match Pin::new(&mut this.sleep).poll(cx) {
|
|
|
|
Poll::Pending => Poll::Pending,
|
|
|
|
Poll::Ready(_) => Poll::Ready(Err(TimeoutError::Timeout)),
|
2018-10-23 21:38:36 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2019-02-03 10:42:27 -08:00
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
2019-11-14 18:38:24 +06:00
|
|
|
use std::task::{Context, Poll};
|
2019-02-03 10:42:27 -08:00
|
|
|
use std::time::Duration;
|
|
|
|
|
|
|
|
use super::*;
|
2019-11-14 18:38:24 +06:00
|
|
|
use actix_service::{apply, factory_fn, Service, ServiceFactory};
|
|
|
|
use futures::future::{ok, FutureExt, LocalBoxFuture};
|
2019-02-03 10:42:27 -08:00
|
|
|
|
|
|
|
struct SleepService(Duration);
|
|
|
|
|
2019-03-09 07:27:35 -08:00
|
|
|
impl Service for SleepService {
|
|
|
|
type Request = ();
|
2019-02-03 10:42:27 -08:00
|
|
|
type Response = ();
|
|
|
|
type Error = ();
|
2019-11-14 18:38:24 +06:00
|
|
|
type Future = LocalBoxFuture<'static, Result<(), ()>>;
|
2019-02-03 10:42:27 -08:00
|
|
|
|
2019-11-14 18:38:24 +06:00
|
|
|
fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
|
|
|
|
Poll::Ready(Ok(()))
|
2019-02-03 10:42:27 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
fn call(&mut self, _: ()) -> Self::Future {
|
2019-11-14 18:38:24 +06:00
|
|
|
tokio_timer::delay_for(self.0)
|
|
|
|
.then(|_| ok::<_, ()>(()))
|
|
|
|
.boxed_local()
|
2019-02-03 10:42:27 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_success() {
|
|
|
|
let resolution = Duration::from_millis(100);
|
|
|
|
let wait_time = Duration::from_millis(50);
|
|
|
|
|
2019-11-14 18:38:24 +06:00
|
|
|
let res = actix_rt::System::new("test").block_on(async {
|
|
|
|
let mut timeout = TimeoutService::new(resolution, SleepService(wait_time));
|
|
|
|
timeout.call(()).await
|
|
|
|
});
|
2019-02-03 10:42:27 -08:00
|
|
|
assert_eq!(res, Ok(()));
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_timeout() {
|
|
|
|
let resolution = Duration::from_millis(100);
|
|
|
|
let wait_time = Duration::from_millis(150);
|
|
|
|
|
2019-11-14 18:38:24 +06:00
|
|
|
let res = actix_rt::System::new("test").block_on(async {
|
|
|
|
let mut timeout = TimeoutService::new(resolution, SleepService(wait_time));
|
|
|
|
timeout.call(()).await
|
|
|
|
});
|
2019-02-03 10:42:27 -08:00
|
|
|
assert_eq!(res, Err(TimeoutError::Timeout));
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_timeout_newservice() {
|
|
|
|
let resolution = Duration::from_millis(100);
|
|
|
|
let wait_time = Duration::from_millis(150);
|
|
|
|
|
2019-11-14 18:38:24 +06:00
|
|
|
let res = actix_rt::System::new("test").block_on(async {
|
|
|
|
let timeout = apply(
|
|
|
|
Timeout::new(resolution),
|
|
|
|
factory_fn(|| ok::<_, ()>(SleepService(wait_time))),
|
|
|
|
);
|
|
|
|
let mut srv = timeout.new_service(&()).await.unwrap();
|
|
|
|
|
|
|
|
srv.call(()).await
|
|
|
|
});
|
2019-02-03 10:42:27 -08:00
|
|
|
assert_eq!(res, Err(TimeoutError::Timeout));
|
|
|
|
}
|
|
|
|
}
|