diff --git a/src/server/config.rs b/src/server/config.rs new file mode 100644 index 00000000..6fba8f92 --- /dev/null +++ b/src/server/config.rs @@ -0,0 +1,213 @@ +use std::collections::HashMap; +use std::{fmt, io, net}; + +use futures::future::{join_all, Future}; +use tokio_tcp::TcpStream; + +use counter::CounterGuard; +use service::{IntoNewService, NewService}; + +use super::server::bind_addr; +use super::services::{ + BoxedServerService, InternalServiceFactory, ServerMessage, StreamService, +}; +use super::Token; + +pub struct ServiceConfig { + pub(super) services: Vec<(String, net::TcpListener)>, + pub(super) rt: Box, +} + +impl ServiceConfig { + pub(super) fn new() -> ServiceConfig { + ServiceConfig { + services: Vec::new(), + rt: Box::new(not_configured), + } + } + + /// Add new service to server + pub fn bind>(&mut self, name: N, addr: U) -> io::Result<&mut Self> + where + U: net::ToSocketAddrs, + { + let sockets = bind_addr(addr)?; + + for lst in sockets { + self.listen(name.as_ref(), lst); + } + + Ok(self) + } + + /// Add new service to server + pub fn listen>(&mut self, name: N, lst: net::TcpListener) -> &mut Self { + self.services.push((name.as_ref().to_string(), lst)); + self + } + + /// Register service configuration function + pub fn rt(&mut self, f: F) -> io::Result<()> + where + F: Fn(&mut ServiceRuntime) + Send + Clone + 'static, + { + self.rt = Box::new(f); + Ok(()) + } +} + +pub(super) struct ConfiguredService { + rt: Box, + names: HashMap, + services: HashMap, +} + +impl ConfiguredService { + pub(super) fn new(rt: Box) -> Self { + ConfiguredService { + rt, + names: HashMap::new(), + services: HashMap::new(), + } + } + + pub(super) fn stream(&mut self, token: Token, name: String) { + self.names.insert(token, name.clone()); + self.services.insert(name, token); + } +} + +impl InternalServiceFactory for ConfiguredService { + fn name(&self, token: Token) -> &str { + &self.names[&token] + } + + fn clone_factory(&self) -> Box { + Box::new(Self { + rt: self.rt.clone(), + names: self.names.clone(), + services: self.services.clone(), + }) + } + + fn create(&self) -> Box, Error = ()>> { + // configure services + let mut rt = ServiceRuntime::new(self.services.clone()); + self.rt.configure(&mut rt); + rt.validate(); + + // construct services + let mut fut = Vec::new(); + for (token, ns) in rt.services { + fut.push(ns.new_service().map(move |service| (token, service))); + } + + Box::new(join_all(fut).map_err(|e| { + error!("Can not construct service: {:?}", e); + })) + } +} + +pub(super) trait ServiceRuntimeConfiguration: Send { + fn clone(&self) -> Box; + + fn configure(&self, &mut ServiceRuntime); +} + +impl ServiceRuntimeConfiguration for F +where + F: Fn(&mut ServiceRuntime) + Send + Clone + 'static, +{ + fn clone(&self) -> Box { + Box::new(self.clone()) + } + + fn configure(&self, rt: &mut ServiceRuntime) { + (self)(rt) + } +} + +fn not_configured(_: &mut ServiceRuntime) { + error!("Service is not configured"); +} + +pub struct ServiceRuntime { + names: HashMap, + services: HashMap, +} + +impl ServiceRuntime { + fn new(names: HashMap) -> Self { + ServiceRuntime { + names, + services: HashMap::new(), + } + } + + fn validate(&self) { + for (name, token) in &self.names { + if !self.services.contains_key(&token) { + error!("Service {:?} is not configured", name); + } + } + } + + pub fn service(&mut self, name: &str, service: F) + where + F: IntoNewService, + T: NewService + 'static, + T::Future: 'static, + T::Service: 'static, + T::InitError: fmt::Debug, + { + // let name = name.to_owned(); + if let Some(token) = self.names.get(name) { + self.services.insert( + token.clone(), + Box::new(ServiceFactory { + inner: service.into_new_service(), + }), + ); + } else { + panic!("Unknown service: {:?}", name); + } + } +} + +type BoxedNewService = Box< + NewService< + Request = (Option, ServerMessage), + Response = (), + Error = (), + InitError = (), + Service = BoxedServerService, + Future = Box>, + >, +>; + +struct ServiceFactory { + inner: T, +} + +impl NewService for ServiceFactory +where + T: NewService, + T::Future: 'static, + T::Service: 'static, + T::Error: 'static, + T::InitError: fmt::Debug + 'static, +{ + type Request = (Option, ServerMessage); + type Response = (); + type Error = (); + type InitError = (); + type Service = BoxedServerService; + type Future = Box>; + + fn new_service(&self) -> Self::Future { + Box::new(self.inner.new_service().map_err(|_| ()).map(|s| { + let service: BoxedServerService = Box::new(StreamService::new(s)); + service + })) + } +} diff --git a/src/server/mod.rs b/src/server/mod.rs index 0a6f9885..a76f8535 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -3,10 +3,12 @@ use actix::Message; mod accept; +mod config; mod server; mod services; mod worker; +pub use self::config::{ServiceConfig, ServiceRuntime}; pub use self::server::Server; pub use self::services::{ServerMessage, ServiceFactory, StreamServiceFactory}; @@ -34,5 +36,11 @@ impl Message for StopServer { } /// Socket id token -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub(crate) struct Token(usize); + +impl Token { + pub(crate) fn next(&self) -> Token { + Token(self.0 + 1) + } +} diff --git a/src/server/server.rs b/src/server/server.rs index 834fa747..874253f6 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -12,6 +12,7 @@ use actix::{ }; use super::accept::{AcceptLoop, AcceptNotify, Command}; +use super::config::{ConfiguredService, ServiceConfig}; use super::services::{InternalServiceFactory, StreamNewService, StreamServiceFactory}; use super::services::{ServiceFactory, ServiceNewService}; use super::worker::{self, Worker, WorkerAvailability, WorkerClient}; @@ -24,6 +25,7 @@ pub(crate) enum ServerCommand { /// Server pub struct Server { threads: usize, + token: Token, workers: Vec<(usize, WorkerClient)>, services: Vec>, sockets: Vec<(Token, net::TcpListener)>, @@ -45,6 +47,7 @@ impl Server { pub fn new() -> Server { Server { threads: num_cpus::get(), + token: Token(0), workers: Vec::new(), services: Vec::new(), sockets: Vec::new(), @@ -113,12 +116,24 @@ impl Server { /// process /// /// This function is useful for moving parts of configuration to a - /// different module or event library. - pub fn configure(self, cfg: F) -> Server + /// different module or even library. + pub fn configure(mut self, f: F) -> io::Result where - F: Fn(Server) -> Server, + F: Fn(&mut ServiceConfig) -> io::Result<()>, { - cfg(self) + let mut cfg = ServiceConfig::new(); + + f(&mut cfg)?; + + let mut srv = ConfiguredService::new(cfg.rt); + for (name, lst) in cfg.services { + let token = self.token.next(); + srv.stream(token, name); + self.sockets.push((token, lst)); + } + self.services.push(Box::new(srv)); + + Ok(self) } /// Add new service to server @@ -145,9 +160,12 @@ impl Server { where F: StreamServiceFactory, { - let token = Token(self.services.len()); - self.services - .push(StreamNewService::create(name.as_ref().to_string(), factory)); + let token = self.token.next(); + self.services.push(StreamNewService::create( + name.as_ref().to_string(), + token, + factory, + )); self.sockets.push((token, lst)); self } @@ -162,9 +180,10 @@ impl Server { where F: ServiceFactory, { - let token = Token(self.services.len()); + let token = self.token.next(); self.services.push(ServiceNewService::create( name.as_ref().to_string(), + token, factory, )); self.sockets.push((token, lst)); @@ -403,7 +422,7 @@ impl StreamHandler for Server { } } -fn bind_addr(addr: S) -> io::Result> { +pub(super) fn bind_addr(addr: S) -> io::Result> { let mut err = None; let mut succ = false; let mut sockets = Vec::new(); diff --git a/src/server/services.rs b/src/server/services.rs index 3fa3e0c6..58be7f73 100644 --- a/src/server/services.rs +++ b/src/server/services.rs @@ -7,6 +7,7 @@ use tokio_current_thread::spawn; use tokio_reactor::Handle; use tokio_tcp::TcpStream; +use super::Token; use counter::CounterGuard; use service::{NewService, Service}; @@ -33,11 +34,11 @@ pub trait ServiceFactory: Send + Clone + 'static { } pub(crate) trait InternalServiceFactory: Send { - fn name(&self) -> &str; + fn name(&self, token: Token) -> &str; fn clone_factory(&self) -> Box; - fn create(&self) -> Box>; + fn create(&self) -> Box, Error = ()>>; } pub(crate) type BoxedServerService = Box< @@ -54,7 +55,7 @@ pub(crate) struct StreamService { } impl StreamService { - fn new(service: T) -> Self { + pub(crate) fn new(service: T) -> Self { StreamService { service } } } @@ -133,14 +134,15 @@ where pub(crate) struct ServiceNewService { name: String, inner: F, + token: Token, } impl ServiceNewService where F: ServiceFactory, { - pub(crate) fn create(name: String, inner: F) -> Box { - Box::new(Self { name, inner }) + pub(crate) fn create(name: String, token: Token, inner: F) -> Box { + Box::new(Self { name, inner, token }) } } @@ -148,7 +150,7 @@ impl InternalServiceFactory for ServiceNewService where F: ServiceFactory, { - fn name(&self) -> &str { + fn name(&self, _: Token) -> &str { &self.name } @@ -156,10 +158,12 @@ where Box::new(Self { name: self.name.clone(), inner: self.inner.clone(), + token: self.token, }) } - fn create(&self) -> Box> { + fn create(&self) -> Box, Error = ()>> { + let token = self.token; Box::new( self.inner .create() @@ -167,7 +171,7 @@ where .map_err(|_| ()) .map(move |inner| { let service: BoxedServerService = Box::new(ServerService::new(inner)); - service + vec![(token, service)] }), ) } @@ -176,14 +180,15 @@ where pub(crate) struct StreamNewService { name: String, inner: F, + token: Token, } impl StreamNewService where F: StreamServiceFactory, { - pub(crate) fn create(name: String, inner: F) -> Box { - Box::new(Self { name, inner }) + pub(crate) fn create(name: String, token: Token, inner: F) -> Box { + Box::new(Self { name, token, inner }) } } @@ -191,7 +196,7 @@ impl InternalServiceFactory for StreamNewService where F: StreamServiceFactory, { - fn name(&self) -> &str { + fn name(&self, _: Token) -> &str { &self.name } @@ -199,10 +204,12 @@ where Box::new(Self { name: self.name.clone(), inner: self.inner.clone(), + token: self.token, }) } - fn create(&self) -> Box> { + fn create(&self) -> Box, Error = ()>> { + let token = self.token; Box::new( self.inner .create() @@ -210,22 +217,22 @@ where .map_err(|_| ()) .map(move |inner| { let service: BoxedServerService = Box::new(StreamService::new(inner)); - service + vec![(token, service)] }), ) } } impl InternalServiceFactory for Box { - fn name(&self) -> &str { - self.as_ref().name() + fn name(&self, token: Token) -> &str { + self.as_ref().name(token) } fn clone_factory(&self) -> Box { self.as_ref().clone_factory() } - fn create(&self) -> Box> { + fn create(&self) -> Box, Error = ()>> { self.as_ref().create() } } diff --git a/src/server/worker.rs b/src/server/worker.rs index e9d8f8c8..3b5eb630 100644 --- a/src/server/worker.rs +++ b/src/server/worker.rs @@ -127,7 +127,7 @@ impl WorkerAvailability { pub(crate) struct Worker { rx: UnboundedReceiver, rx2: UnboundedReceiver, - services: Vec, + services: Vec>, availability: WorkerAvailability, conns: Counter, factories: Vec>, @@ -156,8 +156,12 @@ impl Worker { }); let mut fut = Vec::new(); - for factory in &wrk.factories { - fut.push(factory.create()); + for (idx, factory) in wrk.factories.iter().enumerate() { + fut.push(factory.create().map(move |res| { + res.into_iter() + .map(|(t, s)| (idx, t, s)) + .collect::>() + })); } spawn( future::join_all(fut) @@ -165,7 +169,14 @@ impl Worker { error!("Can not start worker: {:?}", e); Arbiter::current().do_send(StopArbiter(0)); }).and_then(move |services| { - wrk.services.extend(services); + for item in services { + for (idx, token, service) in item { + while token.0 >= wrk.services.len() { + wrk.services.push(None); + } + wrk.services[token.0] = Some((idx, service)); + } + } wrk }), ); @@ -174,33 +185,42 @@ impl Worker { fn shutdown(&mut self, force: bool) { if force { self.services.iter_mut().for_each(|h| { - let _ = h.call((None, ServerMessage::ForceShutdown)); + if let Some(h) = h { + let _ = h.1.call((None, ServerMessage::ForceShutdown)); + } }); } else { let timeout = self.shutdown_timeout; self.services.iter_mut().for_each(move |h| { - let _ = h.call((None, ServerMessage::Shutdown(timeout.clone()))); + if let Some(h) = h { + let _ = h.1.call((None, ServerMessage::Shutdown(timeout.clone()))); + } }); } } - fn check_readiness(&mut self, trace: bool) -> Result { + fn check_readiness(&mut self, trace: bool) -> Result { let mut ready = self.conns.available(); let mut failed = None; - for (idx, service) in self.services.iter_mut().enumerate() { - match service.poll_ready() { - Ok(Async::Ready(_)) => { - if trace { - trace!("Service {:?} is available", self.factories[idx].name()); + for (token, service) in &mut self.services.iter_mut().enumerate() { + if let Some(service) = service { + match service.1.poll_ready() { + Ok(Async::Ready(_)) => { + if trace { + trace!( + "Service {:?} is available", + self.factories[service.0].name(Token(token)) + ); + } + } + Ok(Async::NotReady) => ready = false, + Err(_) => { + error!( + "Service {:?} readiness check returned error, restarting", + self.factories[service.0].name(Token(token)) + ); + failed = Some((Token(token), service.0)); } - } - Ok(Async::NotReady) => ready = false, - Err(_) => { - error!( - "Service {:?} readiness check returned error, restarting", - self.factories[idx].name() - ); - failed = Some(idx); } } } @@ -216,7 +236,11 @@ enum WorkerState { None, Available, Unavailable(Vec), - Restarting(usize, Box>), + Restarting( + usize, + Token, + Box, Error = ()>>, + ), Shutdown(Delay, Delay, oneshot::Sender), } @@ -272,6 +296,9 @@ impl Future for Worker { Ok(true) => { let guard = self.conns.get(); let _ = self.services[msg.handler.0] + .as_mut() + .expect("actix net bug") + .1 .call((Some(guard), ServerMessage::Connect(msg.io))); } Ok(false) => { @@ -279,13 +306,14 @@ impl Future for Worker { self.state = WorkerState::Unavailable(conns); return self.poll(); } - Err(idx) => { + Err((token, idx)) => { trace!( "Service {:?} failed, restarting", - self.factories[idx].name() + self.factories[idx].name(token) ); self.state = WorkerState::Restarting( idx, + token, self.factories[idx].create(), ); return self.poll(); @@ -299,32 +327,38 @@ impl Future for Worker { self.state = WorkerState::Unavailable(conns); return Ok(Async::NotReady); } - Err(idx) => { + Err((token, idx)) => { trace!( "Service {:?} failed, restarting", - self.factories[idx].name() + self.factories[idx].name(token) ); - self.state = WorkerState::Restarting(idx, self.factories[idx].create()); + self.state = + WorkerState::Restarting(idx, token, self.factories[idx].create()); return self.poll(); } } } - WorkerState::Restarting(idx, mut fut) => { + WorkerState::Restarting(idx, token, mut fut) => { match fut.poll() { - Ok(Async::Ready(service)) => { - trace!( - "Service {:?} has been restarted", - self.factories[idx].name() - ); - self.services[idx] = service; - self.state = WorkerState::Unavailable(Vec::new()); + Ok(Async::Ready(item)) => { + for (token, service) in item { + trace!( + "Service {:?} has been restarted", + self.factories[idx].name(token) + ); + self.services[token.0] = Some((idx, service)); + self.state = WorkerState::Unavailable(Vec::new()); + } } Ok(Async::NotReady) => { - self.state = WorkerState::Restarting(idx, fut); + self.state = WorkerState::Restarting(idx, token, fut); return Ok(Async::NotReady); } Err(_) => { - panic!("Can not restart {:?} service", self.factories[idx].name()); + panic!( + "Can not restart {:?} service", + self.factories[idx].name(token) + ); } } return self.poll(); @@ -368,6 +402,9 @@ impl Future for Worker { Ok(true) => { let guard = self.conns.get(); let _ = self.services[msg.handler.0] + .as_mut() + .expect("actix net bug") + .1 .call((Some(guard), ServerMessage::Connect(msg.io))); continue; } @@ -376,14 +413,15 @@ impl Future for Worker { self.availability.set(false); self.state = WorkerState::Unavailable(vec![msg]); } - Err(idx) => { + Err((token, idx)) => { trace!( "Service {:?} failed, restarting", - self.factories[idx].name() + self.factories[idx].name(token) ); self.availability.set(false); self.state = WorkerState::Restarting( idx, + token, self.factories[idx].create(), ); }