1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-12-18 20:13:10 +01:00
actix-net/actix-utils/src/timeout.rs

173 lines
3.9 KiB
Rust
Raw Normal View History

2018-10-30 04:29:47 +01:00
//! Service that applies a timeout to requests.
2018-10-24 06:38:36 +02:00
//!
//! If the response does not complete within the specified timeout, the response
//! will be aborted.
use std::fmt;
use std::time::Duration;
2018-12-09 19:15:49 +01:00
use actix_service::{NewService, Service};
2018-12-06 23:04:42 +01:00
use futures::try_ready;
2018-10-24 06:38:36 +02:00
use futures::{Async, Future, Poll};
use tokio_timer::{clock, Delay};
/// Applies a timeout to requests.
#[derive(Debug)]
2018-11-30 03:56:15 +01:00
pub struct Timeout<T> {
2018-10-24 06:38:36 +02:00
inner: T,
timeout: Duration,
}
/// Timeout error
pub enum TimeoutError<E> {
/// Service error
Service(E),
/// Service call timeout
Timeout,
}
impl<E: fmt::Debug> fmt::Debug for TimeoutError<E> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
TimeoutError::Service(e) => write!(f, "TimeoutError::Service({:?})", e),
TimeoutError::Timeout => write!(f, "TimeoutError::Timeout"),
}
}
}
2018-11-30 03:56:15 +01:00
impl<T> Timeout<T> {
pub fn new(timeout: Duration, inner: T) -> Self
2018-11-30 03:56:15 +01:00
where
T: NewService + Clone,
2018-11-30 03:56:15 +01:00
{
2018-10-24 06:38:36 +02:00
Timeout { inner, timeout }
}
}
impl<T> Clone for Timeout<T>
where
T: Clone,
{
fn clone(&self) -> Self {
Timeout {
inner: self.inner.clone(),
timeout: self.timeout,
}
}
}
impl<T> NewService for Timeout<T>
2018-10-24 06:38:36 +02:00
where
T: NewService + Clone,
2018-10-24 06:38:36 +02:00
{
type Request = T::Request;
2018-10-24 06:38:36 +02:00
type Response = T::Response;
type Error = TimeoutError<T::Error>;
type InitError = T::InitError;
type Service = TimeoutService<T::Service>;
type Future = TimeoutFut<T>;
2018-10-24 06:38:36 +02:00
fn new_service(&self) -> Self::Future {
TimeoutFut {
fut: self.inner.new_service(),
2018-12-06 23:04:42 +01:00
timeout: self.timeout,
2018-10-24 06:38:36 +02:00
}
}
}
2018-10-30 04:29:47 +01:00
/// `Timeout` response future
#[derive(Debug)]
pub struct TimeoutFut<T: NewService> {
2018-10-30 04:29:47 +01:00
fut: T::Future,
timeout: Duration,
}
impl<T> Future for TimeoutFut<T>
2018-10-24 06:38:36 +02:00
where
T: NewService,
2018-10-24 06:38:36 +02:00
{
type Item = TimeoutService<T::Service>;
type Error = T::InitError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let service = try_ready!(self.fut.poll());
Ok(Async::Ready(TimeoutService::new(self.timeout, service)))
}
}
/// Applies a timeout to requests.
#[derive(Debug)]
pub struct TimeoutService<T> {
inner: T,
timeout: Duration,
}
impl<T> TimeoutService<T> {
pub fn new(timeout: Duration, inner: T) -> Self
2018-11-30 03:56:15 +01:00
where
T: Service,
2018-11-30 03:56:15 +01:00
{
2018-10-24 06:38:36 +02:00
TimeoutService { inner, timeout }
}
}
2018-11-30 03:56:15 +01:00
impl<T: Clone> Clone for TimeoutService<T> {
2018-10-24 06:38:36 +02:00
fn clone(&self) -> Self {
TimeoutService {
inner: self.inner.clone(),
timeout: self.timeout,
}
}
}
impl<T> Service for TimeoutService<T>
2018-10-24 06:38:36 +02:00
where
T: Service,
2018-10-24 06:38:36 +02:00
{
type Request = T::Request;
2018-10-24 06:38:36 +02:00
type Response = T::Response;
type Error = TimeoutError<T::Error>;
type Future = TimeoutServiceResponse<T>;
2018-10-24 06:38:36 +02:00
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
2018-12-06 23:04:42 +01:00
self.inner.poll_ready().map_err(TimeoutError::Service)
2018-10-24 06:38:36 +02:00
}
fn call(&mut self, request: T::Request) -> Self::Future {
2018-10-24 06:38:36 +02:00
TimeoutServiceResponse {
fut: self.inner.call(request),
sleep: Delay::new(clock::now() + self.timeout),
}
}
}
/// `TimeoutService` response future
#[derive(Debug)]
pub struct TimeoutServiceResponse<T: Service> {
2018-10-24 06:38:36 +02:00
fut: T::Future,
sleep: Delay,
}
impl<T> Future for TimeoutServiceResponse<T>
2018-10-24 06:38:36 +02:00
where
T: Service,
2018-10-24 06:38:36 +02:00
{
type Item = T::Response;
type Error = TimeoutError<T::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// First, try polling the future
match self.fut.poll() {
Ok(Async::Ready(v)) => return Ok(Async::Ready(v)),
Ok(Async::NotReady) => {}
Err(e) => return Err(TimeoutError::Service(e)),
}
// Now check the sleep
match self.sleep.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(_)) => Err(TimeoutError::Timeout),
Err(_) => Err(TimeoutError::Timeout),
}
}
}