1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-12-18 21:13:12 +01:00
actix-net/actix-utils/src/timeout.rs

247 lines
6.4 KiB
Rust
Raw Normal View History

2018-10-30 04:29:47 +01:00
//! Service that applies a timeout to requests.
2018-10-24 06:38:36 +02:00
//!
//! If the response does not complete within the specified timeout, the response
//! will be aborted.
use std::fmt;
2019-02-03 20:33:26 +01:00
use std::marker::PhantomData;
2018-10-24 06:38:36 +02:00
use std::time::Duration;
use actix_service::{IntoService, Service, Transform};
use futures::future::{ok, FutureResult};
2018-10-24 06:38:36 +02:00
use futures::{Async, Future, Poll};
use tokio_timer::{clock, Delay};
/// Applies a timeout to requests.
2019-02-03 20:37:34 +01:00
#[derive(Debug)]
2019-02-03 20:33:26 +01:00
pub struct Timeout<E = ()> {
2018-10-24 06:38:36 +02:00
timeout: Duration,
2019-02-03 20:33:26 +01:00
_t: PhantomData<E>,
2018-10-24 06:38:36 +02:00
}
/// Timeout error
pub enum TimeoutError<E> {
/// Service error
Service(E),
/// Service call timeout
Timeout,
}
impl<E> From<E> for TimeoutError<E> {
fn from(err: E) -> Self {
TimeoutError::Service(err)
}
}
2018-10-24 06:38:36 +02:00
impl<E: fmt::Debug> fmt::Debug for TimeoutError<E> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
TimeoutError::Service(e) => write!(f, "TimeoutError::Service({:?})", e),
TimeoutError::Timeout => write!(f, "TimeoutError::Timeout"),
}
}
}
2019-02-11 17:34:57 +01:00
impl<E: fmt::Display> fmt::Display for TimeoutError<E> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
TimeoutError::Service(e) => e.fmt(f),
TimeoutError::Timeout => write!(f, "Service call timeout"),
}
}
}
impl<E: PartialEq> PartialEq for TimeoutError<E> {
fn eq(&self, other: &TimeoutError<E>) -> bool {
match self {
TimeoutError::Service(e1) => match other {
TimeoutError::Service(e2) => e1 == e2,
TimeoutError::Timeout => false,
},
TimeoutError::Timeout => match other {
TimeoutError::Service(_) => false,
TimeoutError::Timeout => true,
},
}
}
}
2019-02-03 20:33:26 +01:00
impl<E> Timeout<E> {
pub fn new(timeout: Duration) -> Self {
2019-02-03 20:33:26 +01:00
Timeout {
timeout,
_t: PhantomData,
}
2018-10-24 06:38:36 +02:00
}
}
2019-02-03 20:37:34 +01:00
impl<E> Clone for Timeout<E> {
fn clone(&self) -> Self {
Timeout::new(self.timeout)
}
}
2019-03-09 16:27:35 +01:00
impl<S, E> Transform<S> for Timeout<E>
2018-10-24 06:38:36 +02:00
where
2019-03-09 16:27:35 +01:00
S: Service,
2018-10-24 06:38:36 +02:00
{
2019-03-09 16:27:35 +01:00
type Request = S::Request;
type Response = S::Response;
type Error = TimeoutError<S::Error>;
2019-02-03 20:33:26 +01:00
type InitError = E;
2019-03-05 04:38:11 +01:00
type Transform = TimeoutService<S>;
type Future = FutureResult<Self::Transform, Self::InitError>;
2019-03-05 04:38:11 +01:00
fn new_transform(&self, service: S) -> Self::Future {
ok(TimeoutService {
2019-03-05 04:38:11 +01:00
service,
timeout: self.timeout,
})
2018-10-24 06:38:36 +02:00
}
}
/// Applies a timeout to requests.
#[derive(Debug, Clone)]
2019-03-05 04:38:11 +01:00
pub struct TimeoutService<S> {
service: S,
2018-10-24 06:38:36 +02:00
timeout: Duration,
}
impl<S> TimeoutService<S>
where
S: Service,
{
pub fn new<U>(timeout: Duration, service: U) -> Self
where
U: IntoService<S>,
{
TimeoutService {
timeout,
service: service.into_service(),
}
2018-10-24 06:38:36 +02:00
}
}
2019-03-09 16:27:35 +01:00
impl<S> Service for TimeoutService<S>
2018-10-24 06:38:36 +02:00
where
2019-03-09 16:27:35 +01:00
S: Service,
2018-10-24 06:38:36 +02:00
{
2019-03-09 16:27:35 +01:00
type Request = S::Request;
type Response = S::Response;
type Error = TimeoutError<S::Error>;
2019-03-09 16:27:35 +01:00
type Future = TimeoutServiceResponse<S>;
2018-10-24 06:38:36 +02:00
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
2019-03-05 04:38:11 +01:00
self.service.poll_ready().map_err(TimeoutError::Service)
2018-10-24 06:38:36 +02:00
}
2019-03-09 16:27:35 +01:00
fn call(&mut self, request: S::Request) -> Self::Future {
2018-10-24 06:38:36 +02:00
TimeoutServiceResponse {
2019-03-05 04:38:11 +01:00
fut: self.service.call(request),
2018-10-24 06:38:36 +02:00
sleep: Delay::new(clock::now() + self.timeout),
}
}
}
/// `TimeoutService` response future
#[derive(Debug)]
2019-03-09 16:27:35 +01:00
pub struct TimeoutServiceResponse<T: Service> {
2018-10-24 06:38:36 +02:00
fut: T::Future,
sleep: Delay,
}
2019-03-09 16:27:35 +01:00
impl<T> Future for TimeoutServiceResponse<T>
2018-10-24 06:38:36 +02:00
where
2019-03-09 16:27:35 +01:00
T: Service,
2018-10-24 06:38:36 +02:00
{
type Item = T::Response;
type Error = TimeoutError<T::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// First, try polling the future
match self.fut.poll() {
Ok(Async::Ready(v)) => return Ok(Async::Ready(v)),
Ok(Async::NotReady) => {}
Err(e) => return Err(TimeoutError::Service(e)),
}
// Now check the sleep
match self.sleep.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(_)) => Err(TimeoutError::Timeout),
Err(_) => Err(TimeoutError::Timeout),
}
}
}
#[cfg(test)]
mod tests {
use futures::future::lazy;
use futures::{Async, Poll};
use std::time::Duration;
use super::*;
2019-02-23 03:47:29 +01:00
use actix_service::blank::{Blank, BlankNewService};
use actix_service::{NewService, Service, ServiceExt};
struct SleepService(Duration);
2019-03-09 16:27:35 +01:00
impl Service for SleepService {
type Request = ();
type Response = ();
type Error = ();
type Future = Box<Future<Item = (), Error = ()>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, _: ()) -> Self::Future {
Box::new(tokio_timer::sleep(self.0).map_err(|_| ()))
}
}
#[test]
fn test_success() {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(50);
let res = actix_rt::System::new("test").block_on(lazy(|| {
let mut timeout = Blank::default()
2019-03-05 04:38:11 +01:00
.and_then(TimeoutService::new(resolution, SleepService(wait_time)));
timeout.call(())
}));
assert_eq!(res, Ok(()));
}
#[test]
fn test_timeout() {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(150);
let res = actix_rt::System::new("test").block_on(lazy(|| {
let mut timeout = Blank::default()
2019-03-05 04:38:11 +01:00
.and_then(TimeoutService::new(resolution, SleepService(wait_time)));
timeout.call(())
}));
assert_eq!(res, Err(TimeoutError::Timeout));
}
#[test]
fn test_timeout_newservice() {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(150);
let res = actix_rt::System::new("test").block_on(lazy(|| {
2019-03-05 04:45:17 +01:00
let timeout = BlankNewService::<(), (), ()>::default()
.apply(Timeout::new(resolution), || Ok(SleepService(wait_time)));
2019-02-22 22:08:31 +01:00
if let Async::Ready(mut to) = timeout.new_service(&()).poll().unwrap() {
to.call(())
} else {
panic!()
}
}));
assert_eq!(res, Err(TimeoutError::Timeout));
}
}