//! Simple leaky-bucket rate-limiter. use std::{ cell::RefCell, cmp::min, future::{ready, Ready}, }; use actix_web::{ body::EitherBody, dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform}, Error, HttpResponse, }; use chrono::{Local, NaiveDateTime}; use futures_util::{future::LocalBoxFuture, FutureExt, TryFutureExt}; #[doc(hidden)] pub struct RateLimitService { service: S, token_bucket: RefCell, } impl Service for RateLimitService where S: Service, Error = Error>, S::Future: 'static, B: 'static, { type Response = ServiceResponse>; type Error = Error; type Future = LocalBoxFuture<'static, Result>; forward_ready!(service); fn call(&self, req: ServiceRequest) -> Self::Future { log::info!("request is passing through the AddMsg middleware"); if !self.token_bucket.borrow_mut().allow_query() { // request has been rate limited return Box::pin(async { Ok(req.into_response( HttpResponse::TooManyRequests() .finish() .map_into_right_body(), )) }); } self.service .call(req) .map_ok(ServiceResponse::map_into_left_body) .boxed_local() } } #[derive(Clone, Debug)] pub struct RateLimit { /// Request limit for 10 second period. limit: u64, } impl RateLimit { /// Constructs new rate limiter. pub fn new(limit: u64) -> Self { Self { limit } } } impl Transform for RateLimit where S: Service, Error = actix_web::Error>, S::Future: 'static, B: 'static, { type Response = ServiceResponse>; type Error = Error; type Transform = RateLimitService; type InitError = (); type Future = Ready>; fn new_transform(&self, service: S) -> Self::Future { ready(Ok(RateLimitService { service, token_bucket: RefCell::new(TokenBucket::new(self.limit)), })) } } struct TokenBucket { /// Request limit for 10 second period. limit: u64, /// Max number of requests for 10 second period, in this case equal to limit. capacity: u64, /// Time that last request was accepted. last_req_time: NaiveDateTime, /// Numbers of tokens remaining. /// /// Initialized equal to capacity. tokens: u64, } impl TokenBucket { /// Constructs new leaky bucket. fn new(limit: u64) -> Self { TokenBucket { limit, last_req_time: NaiveDateTime::UNIX_EPOCH, capacity: limit, tokens: 0, } } /// Mutates leaky bucket for accepted request. fn allow_query(&mut self) -> bool { let current_time = Local::now().naive_local(); let time_elapsed = (current_time.timestamp() - self.last_req_time.timestamp()) as u64; let tokens_to_add = time_elapsed * self.limit / 10; self.tokens = min(self.tokens + tokens_to_add, self.capacity); if self.tokens > 0 { self.last_req_time = current_time; self.tokens -= 1; true } else { false } } }