use std::collections::HashMap; use std::{fmt, io, net}; use actix_rt::net::TcpStream; use actix_service as actix; use actix_utils::counter::CounterGuard; use futures_util::future::{ok, Future, FutureExt, LocalBoxFuture}; use log::error; use super::builder::bind_addr; use super::service::{ BoxedServerService, InternalServiceFactory, ServerMessage, StreamService, }; use super::Token; pub struct ServiceConfig { pub(crate) services: Vec<(String, net::TcpListener)>, pub(crate) apply: Option>, pub(crate) threads: usize, pub(crate) backlog: i32, } impl ServiceConfig { pub(super) fn new(threads: usize, backlog: i32) -> ServiceConfig { ServiceConfig { threads, backlog, services: Vec::new(), apply: None, } } /// Set number of workers to start. /// /// By default server uses number of available logical cpu as workers /// count. pub fn workers(&mut self, num: usize) { self.threads = num; } /// Add new service to server pub fn bind>(&mut self, name: N, addr: U) -> io::Result<&mut Self> where U: net::ToSocketAddrs, { let sockets = bind_addr(addr, self.backlog)?; for lst in sockets { self.listen(name.as_ref(), lst); } Ok(self) } /// Add new service to server pub fn listen>(&mut self, name: N, lst: net::TcpListener) -> &mut Self { if self.apply.is_none() { self.apply = Some(Box::new(not_configured)); } self.services.push((name.as_ref().to_string(), lst)); self } /// Register service configuration function. This function get called /// during worker runtime configuration. It get executed in worker thread. pub fn apply(&mut self, f: F) -> io::Result<()> where F: Fn(&mut ServiceRuntime) + Send + Clone + 'static, { self.apply = Some(Box::new(f)); Ok(()) } } pub(super) struct ConfiguredService { rt: Box, names: HashMap, topics: HashMap, services: Vec, } impl ConfiguredService { pub(super) fn new(rt: Box) -> Self { ConfiguredService { rt, names: HashMap::new(), topics: HashMap::new(), services: Vec::new(), } } pub(super) fn stream(&mut self, token: Token, name: String, addr: net::SocketAddr) { self.names.insert(token, (name.clone(), addr)); self.topics.insert(name, token); self.services.push(token); } } impl InternalServiceFactory for ConfiguredService { fn name(&self, token: Token) -> &str { &self.names[&token].0 } fn clone_factory(&self) -> Box { Box::new(Self { rt: self.rt.clone(), names: self.names.clone(), topics: self.topics.clone(), services: self.services.clone(), }) } fn create(&self) -> LocalBoxFuture<'static, Result, ()>> { // configure services let mut rt = ServiceRuntime::new(self.topics.clone()); self.rt.configure(&mut rt); rt.validate(); let mut names = self.names.clone(); let tokens = self.services.clone(); // construct services async move { let mut services = rt.services; // TODO: Proper error handling here for f in rt.onstart.into_iter() { f.await; } let mut res = vec![]; for token in tokens { if let Some(srv) = services.remove(&token) { let newserv = srv.new_service(()); match newserv.await { Ok(serv) => { res.push((token, serv)); } Err(_) => { error!("Can not construct service"); return Err(()); } } } else { let name = names.remove(&token).unwrap().0; res.push(( token, Box::new(StreamService::new(actix::fn_service( move |_: TcpStream| { error!("Service {:?} is not configured", name); ok::<_, ()>(()) }, ))), )); }; } return Ok(res); } .boxed_local() } } pub(super) trait ServiceRuntimeConfiguration: Send { fn clone(&self) -> Box; fn configure(&self, rt: &mut ServiceRuntime); } impl ServiceRuntimeConfiguration for F where F: Fn(&mut ServiceRuntime) + Send + Clone + 'static, { fn clone(&self) -> Box { Box::new(self.clone()) } fn configure(&self, rt: &mut ServiceRuntime) { (self)(rt) } } fn not_configured(_: &mut ServiceRuntime) { error!("Service is not configured"); } pub struct ServiceRuntime { names: HashMap, services: HashMap, onstart: Vec>, } impl ServiceRuntime { fn new(names: HashMap) -> Self { ServiceRuntime { names, services: HashMap::new(), onstart: Vec::new(), } } fn validate(&self) { for (name, token) in &self.names { if !self.services.contains_key(&token) { error!("Service {:?} is not configured", name); } } } /// Register service. /// /// Name of the service must be registered during configuration stage with /// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods. pub fn service(&mut self, name: &str, service: F) where F: actix::IntoServiceFactory, T: actix::ServiceFactory + 'static, T::Future: 'static, T::Service: 'static, T::InitError: fmt::Debug, { // let name = name.to_owned(); if let Some(token) = self.names.get(name) { self.services.insert( *token, Box::new(ServiceFactory { inner: service.into_factory(), }), ); } else { panic!("Unknown service: {:?}", name); } } /// Execute future before services initialization. pub fn on_start(&mut self, fut: F) where F: Future + 'static, { self.onstart.push(fut.boxed_local()) } } type BoxedNewService = Box< dyn actix::ServiceFactory< Request = (Option, ServerMessage), Response = (), Error = (), InitError = (), Config = (), Service = BoxedServerService, Future = LocalBoxFuture<'static, Result>, >, >; struct ServiceFactory { inner: T, } impl actix::ServiceFactory for ServiceFactory where T: actix::ServiceFactory, T::Future: 'static, T::Service: 'static, T::Error: 'static, T::InitError: fmt::Debug + 'static, { type Request = (Option, ServerMessage); type Response = (); type Error = (); type InitError = (); type Config = (); type Service = BoxedServerService; type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { let fut = self.inner.new_service(()); async move { return match fut.await { Ok(s) => Ok(Box::new(StreamService::new(s)) as BoxedServerService), Err(e) => { error!("Can not construct service: {:?}", e); Err(()) } }; } .boxed_local() } }