1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-27 16:52:58 +01:00

refactor server service configuration protcess

This commit is contained in:
Nikolay Kim 2018-11-03 09:09:14 -07:00
parent 0e3d1068da
commit 1ac018dc79
5 changed files with 349 additions and 64 deletions

213
src/server/config.rs Normal file
View File

@ -0,0 +1,213 @@
use std::collections::HashMap;
use std::{fmt, io, net};
use futures::future::{join_all, Future};
use tokio_tcp::TcpStream;
use counter::CounterGuard;
use service::{IntoNewService, NewService};
use super::server::bind_addr;
use super::services::{
BoxedServerService, InternalServiceFactory, ServerMessage, StreamService,
};
use super::Token;
pub struct ServiceConfig {
pub(super) services: Vec<(String, net::TcpListener)>,
pub(super) rt: Box<ServiceRuntimeConfiguration>,
}
impl ServiceConfig {
pub(super) fn new() -> ServiceConfig {
ServiceConfig {
services: Vec::new(),
rt: Box::new(not_configured),
}
}
/// Add new service to server
pub fn bind<U, N: AsRef<str>>(&mut self, name: N, addr: U) -> io::Result<&mut Self>
where
U: net::ToSocketAddrs,
{
let sockets = bind_addr(addr)?;
for lst in sockets {
self.listen(name.as_ref(), lst);
}
Ok(self)
}
/// Add new service to server
pub fn listen<N: AsRef<str>>(&mut self, name: N, lst: net::TcpListener) -> &mut Self {
self.services.push((name.as_ref().to_string(), lst));
self
}
/// Register service configuration function
pub fn rt<F>(&mut self, f: F) -> io::Result<()>
where
F: Fn(&mut ServiceRuntime) + Send + Clone + 'static,
{
self.rt = Box::new(f);
Ok(())
}
}
pub(super) struct ConfiguredService {
rt: Box<ServiceRuntimeConfiguration>,
names: HashMap<Token, String>,
services: HashMap<String, Token>,
}
impl ConfiguredService {
pub(super) fn new(rt: Box<ServiceRuntimeConfiguration>) -> Self {
ConfiguredService {
rt,
names: HashMap::new(),
services: HashMap::new(),
}
}
pub(super) fn stream(&mut self, token: Token, name: String) {
self.names.insert(token, name.clone());
self.services.insert(name, token);
}
}
impl InternalServiceFactory for ConfiguredService {
fn name(&self, token: Token) -> &str {
&self.names[&token]
}
fn clone_factory(&self) -> Box<InternalServiceFactory> {
Box::new(Self {
rt: self.rt.clone(),
names: self.names.clone(),
services: self.services.clone(),
})
}
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
// configure services
let mut rt = ServiceRuntime::new(self.services.clone());
self.rt.configure(&mut rt);
rt.validate();
// construct services
let mut fut = Vec::new();
for (token, ns) in rt.services {
fut.push(ns.new_service().map(move |service| (token, service)));
}
Box::new(join_all(fut).map_err(|e| {
error!("Can not construct service: {:?}", e);
}))
}
}
pub(super) trait ServiceRuntimeConfiguration: Send {
fn clone(&self) -> Box<ServiceRuntimeConfiguration>;
fn configure(&self, &mut ServiceRuntime);
}
impl<F> ServiceRuntimeConfiguration for F
where
F: Fn(&mut ServiceRuntime) + Send + Clone + 'static,
{
fn clone(&self) -> Box<ServiceRuntimeConfiguration> {
Box::new(self.clone())
}
fn configure(&self, rt: &mut ServiceRuntime) {
(self)(rt)
}
}
fn not_configured(_: &mut ServiceRuntime) {
error!("Service is not configured");
}
pub struct ServiceRuntime {
names: HashMap<String, Token>,
services: HashMap<Token, BoxedNewService>,
}
impl ServiceRuntime {
fn new(names: HashMap<String, Token>) -> Self {
ServiceRuntime {
names,
services: HashMap::new(),
}
}
fn validate(&self) {
for (name, token) in &self.names {
if !self.services.contains_key(&token) {
error!("Service {:?} is not configured", name);
}
}
}
pub fn service<T, F>(&mut self, name: &str, service: F)
where
F: IntoNewService<T>,
T: NewService<Request = TcpStream, Response = ()> + 'static,
T::Future: 'static,
T::Service: 'static,
T::InitError: fmt::Debug,
{
// let name = name.to_owned();
if let Some(token) = self.names.get(name) {
self.services.insert(
token.clone(),
Box::new(ServiceFactory {
inner: service.into_new_service(),
}),
);
} else {
panic!("Unknown service: {:?}", name);
}
}
}
type BoxedNewService = Box<
NewService<
Request = (Option<CounterGuard>, ServerMessage),
Response = (),
Error = (),
InitError = (),
Service = BoxedServerService,
Future = Box<Future<Item = BoxedServerService, Error = ()>>,
>,
>;
struct ServiceFactory<T> {
inner: T,
}
impl<T> NewService for ServiceFactory<T>
where
T: NewService<Request = TcpStream, Response = ()>,
T::Future: 'static,
T::Service: 'static,
T::Error: 'static,
T::InitError: fmt::Debug + 'static,
{
type Request = (Option<CounterGuard>, ServerMessage);
type Response = ();
type Error = ();
type InitError = ();
type Service = BoxedServerService;
type Future = Box<Future<Item = BoxedServerService, Error = ()>>;
fn new_service(&self) -> Self::Future {
Box::new(self.inner.new_service().map_err(|_| ()).map(|s| {
let service: BoxedServerService = Box::new(StreamService::new(s));
service
}))
}
}

View File

@ -3,10 +3,12 @@
use actix::Message;
mod accept;
mod config;
mod server;
mod services;
mod worker;
pub use self::config::{ServiceConfig, ServiceRuntime};
pub use self::server::Server;
pub use self::services::{ServerMessage, ServiceFactory, StreamServiceFactory};
@ -34,5 +36,11 @@ impl Message for StopServer {
}
/// Socket id token
#[derive(Clone, Copy, Debug)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub(crate) struct Token(usize);
impl Token {
pub(crate) fn next(&self) -> Token {
Token(self.0 + 1)
}
}

View File

@ -12,6 +12,7 @@ use actix::{
};
use super::accept::{AcceptLoop, AcceptNotify, Command};
use super::config::{ConfiguredService, ServiceConfig};
use super::services::{InternalServiceFactory, StreamNewService, StreamServiceFactory};
use super::services::{ServiceFactory, ServiceNewService};
use super::worker::{self, Worker, WorkerAvailability, WorkerClient};
@ -24,6 +25,7 @@ pub(crate) enum ServerCommand {
/// Server
pub struct Server {
threads: usize,
token: Token,
workers: Vec<(usize, WorkerClient)>,
services: Vec<Box<InternalServiceFactory>>,
sockets: Vec<(Token, net::TcpListener)>,
@ -45,6 +47,7 @@ impl Server {
pub fn new() -> Server {
Server {
threads: num_cpus::get(),
token: Token(0),
workers: Vec::new(),
services: Vec::new(),
sockets: Vec::new(),
@ -113,12 +116,24 @@ impl Server {
/// process
///
/// This function is useful for moving parts of configuration to a
/// different module or event library.
pub fn configure<F>(self, cfg: F) -> Server
/// different module or even library.
pub fn configure<F>(mut self, f: F) -> io::Result<Server>
where
F: Fn(Server) -> Server,
F: Fn(&mut ServiceConfig) -> io::Result<()>,
{
cfg(self)
let mut cfg = ServiceConfig::new();
f(&mut cfg)?;
let mut srv = ConfiguredService::new(cfg.rt);
for (name, lst) in cfg.services {
let token = self.token.next();
srv.stream(token, name);
self.sockets.push((token, lst));
}
self.services.push(Box::new(srv));
Ok(self)
}
/// Add new service to server
@ -145,9 +160,12 @@ impl Server {
where
F: StreamServiceFactory,
{
let token = Token(self.services.len());
self.services
.push(StreamNewService::create(name.as_ref().to_string(), factory));
let token = self.token.next();
self.services.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory,
));
self.sockets.push((token, lst));
self
}
@ -162,9 +180,10 @@ impl Server {
where
F: ServiceFactory,
{
let token = Token(self.services.len());
let token = self.token.next();
self.services.push(ServiceNewService::create(
name.as_ref().to_string(),
token,
factory,
));
self.sockets.push((token, lst));
@ -403,7 +422,7 @@ impl StreamHandler<ServerCommand, ()> for Server {
}
}
fn bind_addr<S: net::ToSocketAddrs>(addr: S) -> io::Result<Vec<net::TcpListener>> {
pub(super) fn bind_addr<S: net::ToSocketAddrs>(addr: S) -> io::Result<Vec<net::TcpListener>> {
let mut err = None;
let mut succ = false;
let mut sockets = Vec::new();

View File

@ -7,6 +7,7 @@ use tokio_current_thread::spawn;
use tokio_reactor::Handle;
use tokio_tcp::TcpStream;
use super::Token;
use counter::CounterGuard;
use service::{NewService, Service};
@ -33,11 +34,11 @@ pub trait ServiceFactory: Send + Clone + 'static {
}
pub(crate) trait InternalServiceFactory: Send {
fn name(&self) -> &str;
fn name(&self, token: Token) -> &str;
fn clone_factory(&self) -> Box<InternalServiceFactory>;
fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>>;
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>>;
}
pub(crate) type BoxedServerService = Box<
@ -54,7 +55,7 @@ pub(crate) struct StreamService<T> {
}
impl<T> StreamService<T> {
fn new(service: T) -> Self {
pub(crate) fn new(service: T) -> Self {
StreamService { service }
}
}
@ -133,14 +134,15 @@ where
pub(crate) struct ServiceNewService<F: ServiceFactory> {
name: String,
inner: F,
token: Token,
}
impl<F> ServiceNewService<F>
where
F: ServiceFactory,
{
pub(crate) fn create(name: String, inner: F) -> Box<InternalServiceFactory> {
Box::new(Self { name, inner })
pub(crate) fn create(name: String, token: Token, inner: F) -> Box<InternalServiceFactory> {
Box::new(Self { name, inner, token })
}
}
@ -148,7 +150,7 @@ impl<F> InternalServiceFactory for ServiceNewService<F>
where
F: ServiceFactory,
{
fn name(&self) -> &str {
fn name(&self, _: Token) -> &str {
&self.name
}
@ -156,10 +158,12 @@ where
Box::new(Self {
name: self.name.clone(),
inner: self.inner.clone(),
token: self.token,
})
}
fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>> {
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
let token = self.token;
Box::new(
self.inner
.create()
@ -167,7 +171,7 @@ where
.map_err(|_| ())
.map(move |inner| {
let service: BoxedServerService = Box::new(ServerService::new(inner));
service
vec![(token, service)]
}),
)
}
@ -176,14 +180,15 @@ where
pub(crate) struct StreamNewService<F: StreamServiceFactory> {
name: String,
inner: F,
token: Token,
}
impl<F> StreamNewService<F>
where
F: StreamServiceFactory,
{
pub(crate) fn create(name: String, inner: F) -> Box<InternalServiceFactory> {
Box::new(Self { name, inner })
pub(crate) fn create(name: String, token: Token, inner: F) -> Box<InternalServiceFactory> {
Box::new(Self { name, token, inner })
}
}
@ -191,7 +196,7 @@ impl<F> InternalServiceFactory for StreamNewService<F>
where
F: StreamServiceFactory,
{
fn name(&self) -> &str {
fn name(&self, _: Token) -> &str {
&self.name
}
@ -199,10 +204,12 @@ where
Box::new(Self {
name: self.name.clone(),
inner: self.inner.clone(),
token: self.token,
})
}
fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>> {
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
let token = self.token;
Box::new(
self.inner
.create()
@ -210,22 +217,22 @@ where
.map_err(|_| ())
.map(move |inner| {
let service: BoxedServerService = Box::new(StreamService::new(inner));
service
vec![(token, service)]
}),
)
}
}
impl InternalServiceFactory for Box<InternalServiceFactory> {
fn name(&self) -> &str {
self.as_ref().name()
fn name(&self, token: Token) -> &str {
self.as_ref().name(token)
}
fn clone_factory(&self) -> Box<InternalServiceFactory> {
self.as_ref().clone_factory()
}
fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>> {
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
self.as_ref().create()
}
}

View File

@ -127,7 +127,7 @@ impl WorkerAvailability {
pub(crate) struct Worker {
rx: UnboundedReceiver<WorkerCommand>,
rx2: UnboundedReceiver<StopCommand>,
services: Vec<BoxedServerService>,
services: Vec<Option<(usize, BoxedServerService)>>,
availability: WorkerAvailability,
conns: Counter,
factories: Vec<Box<InternalServiceFactory>>,
@ -156,8 +156,12 @@ impl Worker {
});
let mut fut = Vec::new();
for factory in &wrk.factories {
fut.push(factory.create());
for (idx, factory) in wrk.factories.iter().enumerate() {
fut.push(factory.create().map(move |res| {
res.into_iter()
.map(|(t, s)| (idx, t, s))
.collect::<Vec<_>>()
}));
}
spawn(
future::join_all(fut)
@ -165,7 +169,14 @@ impl Worker {
error!("Can not start worker: {:?}", e);
Arbiter::current().do_send(StopArbiter(0));
}).and_then(move |services| {
wrk.services.extend(services);
for item in services {
for (idx, token, service) in item {
while token.0 >= wrk.services.len() {
wrk.services.push(None);
}
wrk.services[token.0] = Some((idx, service));
}
}
wrk
}),
);
@ -174,33 +185,42 @@ impl Worker {
fn shutdown(&mut self, force: bool) {
if force {
self.services.iter_mut().for_each(|h| {
let _ = h.call((None, ServerMessage::ForceShutdown));
if let Some(h) = h {
let _ = h.1.call((None, ServerMessage::ForceShutdown));
}
});
} else {
let timeout = self.shutdown_timeout;
self.services.iter_mut().for_each(move |h| {
let _ = h.call((None, ServerMessage::Shutdown(timeout.clone())));
if let Some(h) = h {
let _ = h.1.call((None, ServerMessage::Shutdown(timeout.clone())));
}
});
}
}
fn check_readiness(&mut self, trace: bool) -> Result<bool, usize> {
fn check_readiness(&mut self, trace: bool) -> Result<bool, (Token, usize)> {
let mut ready = self.conns.available();
let mut failed = None;
for (idx, service) in self.services.iter_mut().enumerate() {
match service.poll_ready() {
for (token, service) in &mut self.services.iter_mut().enumerate() {
if let Some(service) = service {
match service.1.poll_ready() {
Ok(Async::Ready(_)) => {
if trace {
trace!("Service {:?} is available", self.factories[idx].name());
trace!(
"Service {:?} is available",
self.factories[service.0].name(Token(token))
);
}
}
Ok(Async::NotReady) => ready = false,
Err(_) => {
error!(
"Service {:?} readiness check returned error, restarting",
self.factories[idx].name()
self.factories[service.0].name(Token(token))
);
failed = Some(idx);
failed = Some((Token(token), service.0));
}
}
}
}
@ -216,7 +236,11 @@ enum WorkerState {
None,
Available,
Unavailable(Vec<Conn>),
Restarting(usize, Box<Future<Item = BoxedServerService, Error = ()>>),
Restarting(
usize,
Token,
Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>>,
),
Shutdown(Delay, Delay, oneshot::Sender<bool>),
}
@ -272,6 +296,9 @@ impl Future for Worker {
Ok(true) => {
let guard = self.conns.get();
let _ = self.services[msg.handler.0]
.as_mut()
.expect("actix net bug")
.1
.call((Some(guard), ServerMessage::Connect(msg.io)));
}
Ok(false) => {
@ -279,13 +306,14 @@ impl Future for Worker {
self.state = WorkerState::Unavailable(conns);
return self.poll();
}
Err(idx) => {
Err((token, idx)) => {
trace!(
"Service {:?} failed, restarting",
self.factories[idx].name()
self.factories[idx].name(token)
);
self.state = WorkerState::Restarting(
idx,
token,
self.factories[idx].create(),
);
return self.poll();
@ -299,32 +327,38 @@ impl Future for Worker {
self.state = WorkerState::Unavailable(conns);
return Ok(Async::NotReady);
}
Err(idx) => {
Err((token, idx)) => {
trace!(
"Service {:?} failed, restarting",
self.factories[idx].name()
self.factories[idx].name(token)
);
self.state = WorkerState::Restarting(idx, self.factories[idx].create());
self.state =
WorkerState::Restarting(idx, token, self.factories[idx].create());
return self.poll();
}
}
}
WorkerState::Restarting(idx, mut fut) => {
WorkerState::Restarting(idx, token, mut fut) => {
match fut.poll() {
Ok(Async::Ready(service)) => {
Ok(Async::Ready(item)) => {
for (token, service) in item {
trace!(
"Service {:?} has been restarted",
self.factories[idx].name()
self.factories[idx].name(token)
);
self.services[idx] = service;
self.services[token.0] = Some((idx, service));
self.state = WorkerState::Unavailable(Vec::new());
}
}
Ok(Async::NotReady) => {
self.state = WorkerState::Restarting(idx, fut);
self.state = WorkerState::Restarting(idx, token, fut);
return Ok(Async::NotReady);
}
Err(_) => {
panic!("Can not restart {:?} service", self.factories[idx].name());
panic!(
"Can not restart {:?} service",
self.factories[idx].name(token)
);
}
}
return self.poll();
@ -368,6 +402,9 @@ impl Future for Worker {
Ok(true) => {
let guard = self.conns.get();
let _ = self.services[msg.handler.0]
.as_mut()
.expect("actix net bug")
.1
.call((Some(guard), ServerMessage::Connect(msg.io)));
continue;
}
@ -376,14 +413,15 @@ impl Future for Worker {
self.availability.set(false);
self.state = WorkerState::Unavailable(vec![msg]);
}
Err(idx) => {
Err((token, idx)) => {
trace!(
"Service {:?} failed, restarting",
self.factories[idx].name()
self.factories[idx].name(token)
);
self.availability.set(false);
self.state = WorkerState::Restarting(
idx,
token,
self.factories[idx].create(),
);
}