use std::collections::HashMap; use std::{fmt, io, net}; use actix_service::{IntoNewService, NewService}; use futures::future::{join_all, Future, IntoFuture}; use log::error; use tokio_tcp::TcpStream; use crate::counter::CounterGuard; use super::builder::bind_addr; use super::services::{ BoxedServerService, InternalServiceFactory, ServerMessage, StreamService, }; use super::Token; pub struct ServiceConfig { pub(crate) services: Vec<(String, net::TcpListener)>, pub(crate) apply: Option>, pub(crate) threads: usize, } impl ServiceConfig { pub(super) fn new(threads: usize) -> ServiceConfig { ServiceConfig { threads, services: Vec::new(), apply: None, } } /// Set number of workers to start. /// /// By default server uses number of available logical cpu as workers /// count. pub fn workers(&mut self, num: usize) { self.threads = num; } /// Add new service to server pub fn bind>(&mut self, name: N, addr: U) -> io::Result<&mut Self> where U: net::ToSocketAddrs, { let sockets = bind_addr(addr)?; for lst in sockets { self.listen(name.as_ref(), lst); } Ok(self) } /// Add new service to server pub fn listen>(&mut self, name: N, lst: net::TcpListener) -> &mut Self { if self.apply.is_none() { self.apply = Some(Box::new(not_configured)); } self.services.push((name.as_ref().to_string(), lst)); self } /// Register service configuration function. This function get called /// during worker runtime configuration. It get executed in worker thread. pub fn apply(&mut self, f: F) -> io::Result<()> where F: Fn(&mut ServiceRuntime) + Send + Clone + 'static, { self.apply = Some(Box::new(f)); Ok(()) } } pub(super) struct ConfiguredService { rt: Box, names: HashMap, services: HashMap, } impl ConfiguredService { pub(super) fn new(rt: Box) -> Self { ConfiguredService { rt, names: HashMap::new(), services: HashMap::new(), } } pub(super) fn stream(&mut self, token: Token, name: String) { self.names.insert(token, name.clone()); self.services.insert(name, token); } } impl InternalServiceFactory for ConfiguredService { fn name(&self, token: Token) -> &str { &self.names[&token] } fn clone_factory(&self) -> Box { Box::new(Self { rt: self.rt.clone(), names: self.names.clone(), services: self.services.clone(), }) } fn create(&self) -> Box, Error = ()>> { // configure services let mut rt = ServiceRuntime::new(self.services.clone()); self.rt.configure(&mut rt); rt.validate(); // construct services let mut fut = Vec::new(); for (token, ns) in rt.services { fut.push(ns.new_service(&()).map(move |service| (token, service))); } Box::new(join_all(fut).map_err(|e| { error!("Can not construct service: {:?}", e); })) } } pub(super) trait ServiceRuntimeConfiguration: Send { fn clone(&self) -> Box; fn configure(&self, rt: &mut ServiceRuntime); } impl ServiceRuntimeConfiguration for F where F: Fn(&mut ServiceRuntime) + Send + Clone + 'static, { fn clone(&self) -> Box { Box::new(self.clone()) } fn configure(&self, rt: &mut ServiceRuntime) { (self)(rt) } } fn not_configured(_: &mut ServiceRuntime) { error!("Service is not configured"); } pub struct ServiceRuntime { names: HashMap, services: HashMap, } impl ServiceRuntime { fn new(names: HashMap) -> Self { ServiceRuntime { names, services: HashMap::new(), } } fn validate(&self) { for (name, token) in &self.names { if !self.services.contains_key(&token) { error!("Service {:?} is not configured", name); } } } pub fn service(&mut self, name: &str, service: F) where F: IntoNewService, T: NewService + 'static, T::Future: 'static, T::Service: 'static, T::InitError: fmt::Debug, { // let name = name.to_owned(); if let Some(token) = self.names.get(name) { self.services.insert( token.clone(), Box::new(ServiceFactory { inner: service.into_new_service(), }), ); } else { panic!("Unknown service: {:?}", name); } } } type BoxedNewService = Box< NewService< Request = (Option, ServerMessage), Response = (), Error = (), InitError = (), Service = BoxedServerService, Future = Box>, >, >; struct ServiceFactory { inner: T, } impl NewService for ServiceFactory where T: NewService, T::Future: 'static, T::Service: 'static, T::Error: 'static, T::InitError: fmt::Debug + 'static, { type Request = (Option, ServerMessage); type Response = (); type Error = (); type InitError = (); type Service = BoxedServerService; type Future = Box>; fn new_service(&self, _: &()) -> Self::Future { Box::new( self.inner .new_service(&()) .into_future() .map_err(|_| ()) .map(|s| { let service: BoxedServerService = Box::new(StreamService::new(s)); service }), ) } }