1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-27 20:12:58 +01:00

add name to service registration

This commit is contained in:
Nikolay Kim 2018-09-17 20:19:48 -07:00
parent 4827990298
commit ed23caa314
7 changed files with 72 additions and 31 deletions

View File

@ -72,6 +72,9 @@ tokio-rustls = { version = "^0.7.2", optional = true }
webpki = { version = "0.18", optional = true } webpki = { version = "0.18", optional = true }
webpki-roots = { version = "0.15", optional = true } webpki-roots = { version = "0.15", optional = true }
[dev-dependencies]
env_logger = "0.5"
[profile.release] [profile.release]
lto = true lto = true
opt-level = 3 opt-level = 3

View File

@ -29,7 +29,7 @@ fn main() {
Server::default() Server::default()
.bind( .bind(
// configure service pipeline // configure service pipeline
"0.0.0.0:8443", "basic", "0.0.0.0:8443",
move || { move || {
let num = num.clone(); let num = num.clone();
let acceptor = acceptor.clone(); let acceptor = acceptor.clone();

View File

@ -3,17 +3,18 @@
//! to test: curl https://127.0.0.1:8443/ -k //! to test: curl https://127.0.0.1:8443/ -k
extern crate actix; extern crate actix;
extern crate actix_net; extern crate actix_net;
extern crate env_logger;
extern crate futures; extern crate futures;
extern crate openssl; extern crate openssl;
extern crate tokio_io; extern crate tokio_io;
extern crate tokio_openssl; extern crate tokio_openssl;
extern crate tokio_tcp; extern crate tokio_tcp;
use std::fmt;
use std::sync::{ use std::sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
Arc, Arc,
}; };
use std::{env, fmt};
use futures::{future, Future}; use futures::{future, Future};
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
@ -48,6 +49,9 @@ fn service<T: AsyncRead + AsyncWrite>(
} }
fn main() { fn main() {
env::set_var("RUST_LOG", "actix_net=trace");
env_logger::init();
let sys = actix::System::new("test"); let sys = actix::System::new("test");
// load ssl keys // load ssl keys
@ -68,6 +72,7 @@ fn main() {
Server::default() Server::default()
.bind( .bind(
// configure service pipeline // configure service pipeline
"basic",
"0.0.0.0:8443", "0.0.0.0:8443",
move || { move || {
let num = num.clone(); let num = num.clone();

View File

@ -48,7 +48,7 @@ fn main() {
// server start mutiple workers, it runs supplied `Fn` in each worker. // server start mutiple workers, it runs supplied `Fn` in each worker.
Server::default() Server::default()
.bind("0.0.0.0:8443", move || { .bind("test-ssl", "0.0.0.0:8443", move || {
let num = num.clone(); let num = num.clone();
// configure service // configure service

View File

@ -121,7 +121,7 @@ impl Server {
} }
/// Add new service to server /// Add new service to server
pub fn bind<F, U>(mut self, addr: U, factory: F) -> io::Result<Self> pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
where where
F: ServerServiceFactory, F: ServerServiceFactory,
U: net::ToSocketAddrs, U: net::ToSocketAddrs,
@ -129,18 +129,21 @@ impl Server {
let sockets = bind_addr(addr)?; let sockets = bind_addr(addr)?;
for lst in sockets { for lst in sockets {
self = self.listen(lst, factory.clone()) self = self.listen(name.as_ref(), lst, factory.clone())
} }
Ok(self) Ok(self)
} }
/// Add new service to server /// Add new service to server
pub fn listen<F>(mut self, lst: net::TcpListener, factory: F) -> Self pub fn listen<F, N: AsRef<str>>(
mut self, name: N, lst: net::TcpListener, factory: F,
) -> Self
where where
F: ServerServiceFactory, F: ServerServiceFactory,
{ {
let token = Token(self.services.len()); let token = Token(self.services.len());
self.services.push(ServerNewService::create(factory)); self.services
.push(ServerNewService::create(name.as_ref().to_string(), factory));
self.sockets.push((token, lst)); self.sockets.push((token, lst));
self self
} }
@ -178,7 +181,7 @@ impl Server {
if self.sockets.is_empty() { if self.sockets.is_empty() {
panic!("Service should have at least one bound socket"); panic!("Service should have at least one bound socket");
} else { } else {
info!("Starting {} http workers", self.threads); info!("Starting {} workers", self.threads);
// start workers // start workers
let mut workers = Vec::new(); let mut workers = Vec::new();
@ -190,7 +193,7 @@ impl Server {
// start accept thread // start accept thread
for sock in &self.sockets { for sock in &self.sockets {
info!("Starting server on http://{:?}", sock.1.local_addr().ok()); info!("Starting server on {}", sock.1.local_addr().ok().unwrap());
} }
let rx = self let rx = self
.accept .accept
@ -229,7 +232,7 @@ impl Server {
let services: Vec<Box<InternalServerServiceFactory>> = let services: Vec<Box<InternalServerServiceFactory>> =
self.services.iter().map(|v| v.clone_factory()).collect(); self.services.iter().map(|v| v.clone_factory()).collect();
Arbiter::new(format!("actix-worker-{}", idx)).do_send(Execute::new(|| { Arbiter::new(format!("actix-net-worker-{}", idx)).do_send(Execute::new(|| {
Worker::start(rx, services, avail); Worker::start(rx, services, avail);
Ok::<_, ()>(()) Ok::<_, ()>(())
})); }));

View File

@ -66,6 +66,7 @@ where
} }
pub(crate) struct ServerNewService<F: ServerServiceFactory> { pub(crate) struct ServerNewService<F: ServerServiceFactory> {
name: String,
inner: F, inner: F,
} }
@ -73,12 +74,14 @@ impl<F> ServerNewService<F>
where where
F: ServerServiceFactory, F: ServerServiceFactory,
{ {
pub(crate) fn create(inner: F) -> Box<InternalServerServiceFactory> { pub(crate) fn create(name: String, inner: F) -> Box<InternalServerServiceFactory> {
Box::new(Self { inner }) Box::new(Self { name, inner })
} }
} }
pub(crate) trait InternalServerServiceFactory: Send { pub(crate) trait InternalServerServiceFactory: Send {
fn name(&self) -> &str;
fn clone_factory(&self) -> Box<InternalServerServiceFactory>; fn clone_factory(&self) -> Box<InternalServerServiceFactory>;
fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>>; fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>>;
@ -88,8 +91,13 @@ impl<F> InternalServerServiceFactory for ServerNewService<F>
where where
F: ServerServiceFactory, F: ServerServiceFactory,
{ {
fn name(&self) -> &str {
&self.name
}
fn clone_factory(&self) -> Box<InternalServerServiceFactory> { fn clone_factory(&self) -> Box<InternalServerServiceFactory> {
Box::new(Self { Box::new(Self {
name: self.name.clone(),
inner: self.inner.clone(), inner: self.inner.clone(),
}) })
} }
@ -103,6 +111,10 @@ where
} }
impl InternalServerServiceFactory for Box<InternalServerServiceFactory> { impl InternalServerServiceFactory for Box<InternalServerServiceFactory> {
fn name(&self) -> &str {
self.as_ref().name()
}
fn clone_factory(&self) -> Box<InternalServerServiceFactory> { fn clone_factory(&self) -> Box<InternalServerServiceFactory> {
self.as_ref().clone_factory() self.as_ref().clone_factory()
} }

View File

@ -112,9 +112,9 @@ impl WorkerAvailability {
} }
} }
/// Http worker /// Service worker
/// ///
/// Worker accepts Socket objects via unbounded channel and start requests /// Worker accepts Socket objects via unbounded channel and starts stream
/// processing. /// processing.
pub(crate) struct Worker { pub(crate) struct Worker {
rx: UnboundedReceiver<WorkerCommand>, rx: UnboundedReceiver<WorkerCommand>,
@ -168,15 +168,22 @@ impl Worker {
} }
} }
fn check_readiness(&mut self) -> Result<bool, usize> { fn check_readiness(&mut self, trace: bool) -> Result<bool, usize> {
let mut ready = self.conns.available(); let mut ready = self.conns.available();
let mut failed = None; let mut failed = None;
for (idx, service) in self.services.iter_mut().enumerate() { for (idx, service) in self.services.iter_mut().enumerate() {
match service.poll_ready() { match service.poll_ready() {
Ok(Async::Ready(_)) => (), Ok(Async::Ready(_)) => {
if trace {
trace!("Service {:?} is available", self.factories[idx].name());
}
}
Ok(Async::NotReady) => ready = false, Ok(Async::NotReady) => ready = false,
Err(_) => { Err(_) => {
error!("Service readiness check returned error, restarting"); error!(
"Service {:?} readiness check returned error, restarting",
self.factories[idx].name()
);
failed = Some(idx); failed = Some(idx);
} }
} }
@ -206,14 +213,13 @@ impl Future for Worker {
match state { match state {
WorkerState::Unavailable(mut conns) => { WorkerState::Unavailable(mut conns) => {
match self.check_readiness() { match self.check_readiness(true) {
Ok(true) => { Ok(true) => {
trace!("Serveice is available");
self.state = WorkerState::Available; self.state = WorkerState::Available;
// process requests from wait queue // process requests from wait queue
while let Some(msg) = conns.pop() { while let Some(msg) = conns.pop() {
match self.check_readiness() { match self.check_readiness(false) {
Ok(true) => { Ok(true) => {
let guard = self.conns.get(); let guard = self.conns.get();
spawn( spawn(
@ -226,12 +232,15 @@ impl Future for Worker {
) )
} }
Ok(false) => { Ok(false) => {
trace!("Serveice is unavailable"); trace!("Worker is unavailable");
self.state = WorkerState::Unavailable(conns); self.state = WorkerState::Unavailable(conns);
return self.poll(); return self.poll();
} }
Err(idx) => { Err(idx) => {
trace!("Serveice failed, restarting"); trace!(
"Service {:?} failed, restarting",
self.factories[idx].name()
);
self.state = WorkerState::Restarting( self.state = WorkerState::Restarting(
idx, idx,
self.factories[idx].create(), self.factories[idx].create(),
@ -248,7 +257,10 @@ impl Future for Worker {
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }
Err(idx) => { Err(idx) => {
trace!("Serveice failed, restarting"); trace!(
"Service {:?} failed, restarting",
self.factories[idx].name()
);
self.state = WorkerState::Restarting(idx, self.factories[idx].create()); self.state = WorkerState::Restarting(idx, self.factories[idx].create());
return self.poll(); return self.poll();
} }
@ -257,7 +269,10 @@ impl Future for Worker {
WorkerState::Restarting(idx, mut fut) => { WorkerState::Restarting(idx, mut fut) => {
match fut.poll() { match fut.poll() {
Ok(Async::Ready(service)) => { Ok(Async::Ready(service)) => {
trace!("Service has been restarted"); trace!(
"Service {:?} has been restarted",
self.factories[idx].name()
);
self.services[idx] = service; self.services[idx] = service;
self.state = WorkerState::Unavailable(Vec::new()); self.state = WorkerState::Unavailable(Vec::new());
} }
@ -266,7 +281,7 @@ impl Future for Worker {
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }
Err(_) => { Err(_) => {
panic!("Can not restart service"); panic!("Can not restart {:?} service", self.factories[idx].name());
} }
} }
return self.poll(); return self.poll();
@ -306,7 +321,7 @@ impl Future for Worker {
match self.rx.poll() { match self.rx.poll() {
// handle incoming tcp stream // handle incoming tcp stream
Ok(Async::Ready(Some(WorkerCommand::Message(msg)))) => { Ok(Async::Ready(Some(WorkerCommand::Message(msg)))) => {
match self.check_readiness() { match self.check_readiness(false) {
Ok(true) => { Ok(true) => {
let guard = self.conns.get(); let guard = self.conns.get();
spawn( spawn(
@ -320,12 +335,15 @@ impl Future for Worker {
continue; continue;
} }
Ok(false) => { Ok(false) => {
trace!("Serveice is unsavailable"); trace!("Worker is unavailable");
self.availability.set(false); self.availability.set(false);
self.state = WorkerState::Unavailable(vec![msg]); self.state = WorkerState::Unavailable(vec![msg]);
} }
Err(idx) => { Err(idx) => {
trace!("Serveice failed, restarting"); trace!(
"Service {:?} failed, restarting",
self.factories[idx].name()
);
self.availability.set(false); self.availability.set(false);
self.state = WorkerState::Restarting( self.state = WorkerState::Restarting(
idx, idx,
@ -340,14 +358,14 @@ impl Future for Worker {
self.availability.set(false); self.availability.set(false);
let num = num_connections(); let num = num_connections();
if num == 0 { if num == 0 {
info!("Shutting down http worker, 0 connections"); info!("Shutting down worker, 0 connections");
let _ = tx.send(true); let _ = tx.send(true);
return Ok(Async::Ready(())); return Ok(Async::Ready(()));
} else if let Some(dur) = graceful { } else if let Some(dur) = graceful {
self.shutdown(false); self.shutdown(false);
let num = num_connections(); let num = num_connections();
if num != 0 { if num != 0 {
info!("Graceful http worker shutdown, {} connections", num); info!("Graceful worker shutdown, {} connections", num);
break Some(WorkerState::Shutdown( break Some(WorkerState::Shutdown(
sleep(time::Duration::from_secs(1)), sleep(time::Duration::from_secs(1)),
sleep(dur), sleep(dur),
@ -358,7 +376,7 @@ impl Future for Worker {
return Ok(Async::Ready(())); return Ok(Async::Ready(()));
} }
} else { } else {
info!("Force shutdown http worker, {} connections", num); info!("Force shutdown worker, {} connections", num);
self.shutdown(true); self.shutdown(true);
let _ = tx.send(false); let _ = tx.send(false);
return Ok(Async::Ready(())); return Ok(Async::Ready(()));