From ed23caa3142eb1ca8af16569b147a9e8dbb95d81 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 17 Sep 2018 20:19:48 -0700 Subject: [PATCH] add name to service registration --- Cargo.toml | 3 +++ README.md | 2 +- examples/basic.rs | 7 +++++- examples/ssl.rs | 2 +- src/server/server.rs | 17 +++++++------ src/server/services.rs | 16 ++++++++++-- src/server/worker.rs | 56 ++++++++++++++++++++++++++++-------------- 7 files changed, 72 insertions(+), 31 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 484e1172..e95ffa04 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,6 +72,9 @@ tokio-rustls = { version = "^0.7.2", optional = true } webpki = { version = "0.18", optional = true } webpki-roots = { version = "0.15", optional = true } +[dev-dependencies] +env_logger = "0.5" + [profile.release] lto = true opt-level = 3 diff --git a/README.md b/README.md index 2df69e0e..851d8fa9 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ fn main() { Server::default() .bind( // configure service pipeline - "0.0.0.0:8443", + "basic", "0.0.0.0:8443", move || { let num = num.clone(); let acceptor = acceptor.clone(); diff --git a/examples/basic.rs b/examples/basic.rs index ba073db2..34f6f30d 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -3,17 +3,18 @@ //! to test: curl https://127.0.0.1:8443/ -k extern crate actix; extern crate actix_net; +extern crate env_logger; extern crate futures; extern crate openssl; extern crate tokio_io; extern crate tokio_openssl; extern crate tokio_tcp; -use std::fmt; use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, }; +use std::{env, fmt}; use futures::{future, Future}; use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; @@ -48,6 +49,9 @@ fn service( } fn main() { + env::set_var("RUST_LOG", "actix_net=trace"); + env_logger::init(); + let sys = actix::System::new("test"); // load ssl keys @@ -68,6 +72,7 @@ fn main() { Server::default() .bind( // configure service pipeline + "basic", "0.0.0.0:8443", move || { let num = num.clone(); diff --git a/examples/ssl.rs b/examples/ssl.rs index 86233c4f..91631e76 100644 --- a/examples/ssl.rs +++ b/examples/ssl.rs @@ -48,7 +48,7 @@ fn main() { // server start mutiple workers, it runs supplied `Fn` in each worker. Server::default() - .bind("0.0.0.0:8443", move || { + .bind("test-ssl", "0.0.0.0:8443", move || { let num = num.clone(); // configure service diff --git a/src/server/server.rs b/src/server/server.rs index 3afcbe73..6280faa9 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -121,7 +121,7 @@ impl Server { } /// Add new service to server - pub fn bind(mut self, addr: U, factory: F) -> io::Result + pub fn bind>(mut self, name: N, addr: U, factory: F) -> io::Result where F: ServerServiceFactory, U: net::ToSocketAddrs, @@ -129,18 +129,21 @@ impl Server { let sockets = bind_addr(addr)?; for lst in sockets { - self = self.listen(lst, factory.clone()) + self = self.listen(name.as_ref(), lst, factory.clone()) } Ok(self) } /// Add new service to server - pub fn listen(mut self, lst: net::TcpListener, factory: F) -> Self + pub fn listen>( + mut self, name: N, lst: net::TcpListener, factory: F, + ) -> Self where F: ServerServiceFactory, { let token = Token(self.services.len()); - self.services.push(ServerNewService::create(factory)); + self.services + .push(ServerNewService::create(name.as_ref().to_string(), factory)); self.sockets.push((token, lst)); self } @@ -178,7 +181,7 @@ impl Server { if self.sockets.is_empty() { panic!("Service should have at least one bound socket"); } else { - info!("Starting {} http workers", self.threads); + info!("Starting {} workers", self.threads); // start workers let mut workers = Vec::new(); @@ -190,7 +193,7 @@ impl Server { // start accept thread for sock in &self.sockets { - info!("Starting server on http://{:?}", sock.1.local_addr().ok()); + info!("Starting server on {}", sock.1.local_addr().ok().unwrap()); } let rx = self .accept @@ -229,7 +232,7 @@ impl Server { let services: Vec> = self.services.iter().map(|v| v.clone_factory()).collect(); - Arbiter::new(format!("actix-worker-{}", idx)).do_send(Execute::new(|| { + Arbiter::new(format!("actix-net-worker-{}", idx)).do_send(Execute::new(|| { Worker::start(rx, services, avail); Ok::<_, ()>(()) })); diff --git a/src/server/services.rs b/src/server/services.rs index 459a3447..7a986f7c 100644 --- a/src/server/services.rs +++ b/src/server/services.rs @@ -66,6 +66,7 @@ where } pub(crate) struct ServerNewService { + name: String, inner: F, } @@ -73,12 +74,14 @@ impl ServerNewService where F: ServerServiceFactory, { - pub(crate) fn create(inner: F) -> Box { - Box::new(Self { inner }) + pub(crate) fn create(name: String, inner: F) -> Box { + Box::new(Self { name, inner }) } } pub(crate) trait InternalServerServiceFactory: Send { + fn name(&self) -> &str; + fn clone_factory(&self) -> Box; fn create(&self) -> Box>; @@ -88,8 +91,13 @@ impl InternalServerServiceFactory for ServerNewService where F: ServerServiceFactory, { + fn name(&self) -> &str { + &self.name + } + fn clone_factory(&self) -> Box { Box::new(Self { + name: self.name.clone(), inner: self.inner.clone(), }) } @@ -103,6 +111,10 @@ where } impl InternalServerServiceFactory for Box { + fn name(&self) -> &str { + self.as_ref().name() + } + fn clone_factory(&self) -> Box { self.as_ref().clone_factory() } diff --git a/src/server/worker.rs b/src/server/worker.rs index 0e4e2224..b790de90 100644 --- a/src/server/worker.rs +++ b/src/server/worker.rs @@ -112,9 +112,9 @@ impl WorkerAvailability { } } -/// Http worker +/// Service worker /// -/// Worker accepts Socket objects via unbounded channel and start requests +/// Worker accepts Socket objects via unbounded channel and starts stream /// processing. pub(crate) struct Worker { rx: UnboundedReceiver, @@ -168,15 +168,22 @@ impl Worker { } } - fn check_readiness(&mut self) -> Result { + fn check_readiness(&mut self, trace: bool) -> Result { let mut ready = self.conns.available(); let mut failed = None; for (idx, service) in self.services.iter_mut().enumerate() { match service.poll_ready() { - Ok(Async::Ready(_)) => (), + Ok(Async::Ready(_)) => { + if trace { + trace!("Service {:?} is available", self.factories[idx].name()); + } + } Ok(Async::NotReady) => ready = false, Err(_) => { - error!("Service readiness check returned error, restarting"); + error!( + "Service {:?} readiness check returned error, restarting", + self.factories[idx].name() + ); failed = Some(idx); } } @@ -206,14 +213,13 @@ impl Future for Worker { match state { WorkerState::Unavailable(mut conns) => { - match self.check_readiness() { + match self.check_readiness(true) { Ok(true) => { - trace!("Serveice is available"); self.state = WorkerState::Available; // process requests from wait queue while let Some(msg) = conns.pop() { - match self.check_readiness() { + match self.check_readiness(false) { Ok(true) => { let guard = self.conns.get(); spawn( @@ -226,12 +232,15 @@ impl Future for Worker { ) } Ok(false) => { - trace!("Serveice is unavailable"); + trace!("Worker is unavailable"); self.state = WorkerState::Unavailable(conns); return self.poll(); } Err(idx) => { - trace!("Serveice failed, restarting"); + trace!( + "Service {:?} failed, restarting", + self.factories[idx].name() + ); self.state = WorkerState::Restarting( idx, self.factories[idx].create(), @@ -248,7 +257,10 @@ impl Future for Worker { return Ok(Async::NotReady); } Err(idx) => { - trace!("Serveice failed, restarting"); + trace!( + "Service {:?} failed, restarting", + self.factories[idx].name() + ); self.state = WorkerState::Restarting(idx, self.factories[idx].create()); return self.poll(); } @@ -257,7 +269,10 @@ impl Future for Worker { WorkerState::Restarting(idx, mut fut) => { match fut.poll() { Ok(Async::Ready(service)) => { - trace!("Service has been restarted"); + trace!( + "Service {:?} has been restarted", + self.factories[idx].name() + ); self.services[idx] = service; self.state = WorkerState::Unavailable(Vec::new()); } @@ -266,7 +281,7 @@ impl Future for Worker { return Ok(Async::NotReady); } Err(_) => { - panic!("Can not restart service"); + panic!("Can not restart {:?} service", self.factories[idx].name()); } } return self.poll(); @@ -306,7 +321,7 @@ impl Future for Worker { match self.rx.poll() { // handle incoming tcp stream Ok(Async::Ready(Some(WorkerCommand::Message(msg)))) => { - match self.check_readiness() { + match self.check_readiness(false) { Ok(true) => { let guard = self.conns.get(); spawn( @@ -320,12 +335,15 @@ impl Future for Worker { continue; } Ok(false) => { - trace!("Serveice is unsavailable"); + trace!("Worker is unavailable"); self.availability.set(false); self.state = WorkerState::Unavailable(vec![msg]); } Err(idx) => { - trace!("Serveice failed, restarting"); + trace!( + "Service {:?} failed, restarting", + self.factories[idx].name() + ); self.availability.set(false); self.state = WorkerState::Restarting( idx, @@ -340,14 +358,14 @@ impl Future for Worker { self.availability.set(false); let num = num_connections(); if num == 0 { - info!("Shutting down http worker, 0 connections"); + info!("Shutting down worker, 0 connections"); let _ = tx.send(true); return Ok(Async::Ready(())); } else if let Some(dur) = graceful { self.shutdown(false); let num = num_connections(); if num != 0 { - info!("Graceful http worker shutdown, {} connections", num); + info!("Graceful worker shutdown, {} connections", num); break Some(WorkerState::Shutdown( sleep(time::Duration::from_secs(1)), sleep(dur), @@ -358,7 +376,7 @@ impl Future for Worker { return Ok(Async::Ready(())); } } else { - info!("Force shutdown http worker, {} connections", num); + info!("Force shutdown worker, {} connections", num); self.shutdown(true); let _ = tx.send(false); return Ok(Async::Ready(()));