1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-28 05:22:39 +01:00
actix-net/src/server.rs

561 lines
16 KiB
Rust
Raw Normal View History

2018-08-19 19:47:04 +02:00
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::time::Duration;
use std::{fmt, io, mem, net};
use futures::sync::{mpsc, mpsc::unbounded};
use futures::{Future, Sink, Stream};
use net2::TcpBuilder;
use num_cpus;
use tokio_tcp::TcpStream;
use actix::{
actors::signal, fut, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, Handler,
Response, StreamHandler, System, WrapFuture,
};
use super::accept::{AcceptLoop, AcceptNotify, Command};
2018-08-22 02:08:23 +02:00
use super::server_config::{Config, ServerConfig};
2018-08-19 19:47:04 +02:00
use super::server_service::{ServerNewService, ServerServiceFactory};
2018-08-22 02:08:23 +02:00
use super::service::{IntoNewService, NewService};
2018-08-19 19:47:04 +02:00
use super::worker::{Conn, StopWorker, Worker, WorkerClient};
use super::{PauseServer, ResumeServer, StopServer, Token};
pub(crate) enum ServerCommand {
WorkerDied(usize),
}
/// Server
2018-08-22 02:08:23 +02:00
pub struct Server<C = ServerConfig> {
config: C,
2018-08-19 19:47:04 +02:00
threads: usize,
workers: Vec<(usize, Addr<Worker>)>,
2018-08-22 02:08:23 +02:00
services: Vec<Box<ServerServiceFactory<C> + Send>>,
2018-08-19 19:47:04 +02:00
sockets: Vec<(Token, net::TcpListener)>,
accept: AcceptLoop,
exit: bool,
shutdown_timeout: u16,
signals: Option<Addr<signal::ProcessSignals>>,
no_signals: bool,
maxconn: usize,
maxconnrate: usize,
}
2018-08-22 02:08:23 +02:00
impl Default for Server<ServerConfig> {
2018-08-19 19:47:04 +02:00
fn default() -> Self {
2018-08-22 02:08:23 +02:00
Self::new(ServerConfig::default())
2018-08-19 19:47:04 +02:00
}
}
2018-08-22 02:08:23 +02:00
impl<C: Config> Server<C> {
2018-08-19 19:47:04 +02:00
/// Create new Server instance
2018-08-22 02:08:23 +02:00
pub fn new(config: C) -> Server<C> {
2018-08-19 19:47:04 +02:00
Server {
2018-08-22 02:08:23 +02:00
config,
2018-08-19 19:47:04 +02:00
threads: num_cpus::get(),
workers: Vec::new(),
services: Vec::new(),
sockets: Vec::new(),
accept: AcceptLoop::new(),
exit: false,
shutdown_timeout: 30,
signals: None,
no_signals: false,
maxconn: 102_400,
maxconnrate: 256,
}
}
/// Set number of workers to start.
///
2018-08-22 06:11:16 +02:00
/// By default server uses number of available logical cpu as threads
2018-08-19 19:47:04 +02:00
/// count.
pub fn workers(mut self, num: usize) -> Self {
self.threads = num;
self
}
/// Sets the maximum per-worker number of concurrent connections.
///
/// All socket listeners will stop accepting connections when this limit is
/// reached for each worker.
///
/// By default max connections is set to a 100k.
2018-08-22 02:08:23 +02:00
pub fn maxconn(mut self, num: usize) -> Self
where
C: AsMut<Connections>,
{
2018-08-19 19:47:04 +02:00
self.maxconn = num;
self
}
/// Sets the maximum per-worker concurrent connection establish process.
///
/// All listeners will stop accepting connections when this limit is
/// reached. It can be used to limit the global SSL CPU usage.
///
/// By default max connections is set to a 256.
pub fn maxconnrate(mut self, num: usize) -> Self {
self.maxconnrate = num;
self
}
/// Stop actix system.
///
/// `SystemExit` message stops currently running system.
pub fn system_exit(mut self) -> Self {
self.exit = true;
self
}
#[doc(hidden)]
/// Set alternative address for `ProcessSignals` actor.
pub fn signals(mut self, addr: Addr<signal::ProcessSignals>) -> Self {
self.signals = Some(addr);
self
}
/// Disable signal handling
pub fn disable_signals(mut self) -> Self {
self.no_signals = true;
self
}
/// Timeout for graceful workers shutdown.
///
/// After receiving a stop signal, workers have this much time to finish
/// serving requests. Workers still alive after the timeout are force
/// dropped.
///
/// By default shutdown timeout sets to 30 seconds.
pub fn shutdown_timeout(mut self, sec: u16) -> Self {
self.shutdown_timeout = sec;
self
}
/// Add new service to server
pub fn bind<T, U, N>(mut self, addr: U, srv: T) -> io::Result<Self>
where
U: net::ToSocketAddrs,
T: IntoNewService<N> + Clone,
2018-08-22 02:08:23 +02:00
N: NewService<Request = TcpStream, Response = (), Config = C, InitError = io::Error>
2018-08-19 19:47:04 +02:00
+ Clone
+ Send
+ 'static,
N::Service: 'static,
N::Future: 'static,
N::Error: fmt::Display,
{
let sockets = bind_addr(addr)?;
for lst in sockets {
self = self.listen(lst, srv.clone())
}
Ok(self)
}
/// Add new service to server
pub fn listen<T, N>(mut self, lst: net::TcpListener, srv: T) -> Self
where
T: IntoNewService<N>,
2018-08-22 02:08:23 +02:00
N: NewService<Request = TcpStream, Response = (), Config = C, InitError = io::Error>
2018-08-19 19:47:04 +02:00
+ Clone
+ Send
+ 'static,
N::Service: 'static,
N::Future: 'static,
N::Error: fmt::Display,
{
let token = Token(self.services.len());
2018-08-22 02:08:23 +02:00
self.services.push(ServerNewService::create(
srv.into_new_service(),
self.config.clone(),
));
2018-08-19 19:47:04 +02:00
self.sockets.push((token, lst));
self
}
/// Spawn new thread and start listening for incoming connections.
///
/// This method spawns new thread and starts new actix system. Other than
/// that it is similar to `start()` method. This method blocks.
///
/// This methods panics if no socket addresses get bound.
///
/// ```rust,ignore
/// # extern crate futures;
/// # extern crate actix_web;
/// # use futures::Future;
/// use actix_web::*;
///
/// fn main() {
/// Server::new().
/// .service(
/// HttpServer::new(|| App::new().resource("/", |r| r.h(|_| HttpResponse::Ok())))
/// .bind("127.0.0.1:0")
/// .expect("Can not bind to 127.0.0.1:0"))
/// .run();
/// }
/// ```
pub fn run(self) {
let sys = System::new("http-server");
self.start();
sys.run();
}
/// Starts Server Actor and returns its address
2018-08-22 02:08:23 +02:00
pub fn start(mut self) -> Addr<Server<C>> {
2018-08-19 19:47:04 +02:00
if self.sockets.is_empty() {
panic!("Service should have at least one bound socket");
} else {
info!("Starting {} http workers", self.threads);
// start workers
let mut workers = Vec::new();
for idx in 0..self.threads {
let (addr, worker) = self.start_worker(idx, self.accept.get_notify());
workers.push(worker);
self.workers.push((idx, addr));
}
// start accept thread
for sock in &self.sockets {
info!("Starting server on http://{:?}", sock.1.local_addr().ok());
}
let rx = self
.accept
.start(mem::replace(&mut self.sockets, Vec::new()), workers);
// start http server actor
let signals = self.subscribe_to_signals();
let addr = Actor::create(move |ctx| {
ctx.add_stream(rx);
self
});
if let Some(signals) = signals {
signals.do_send(signal::Subscribe(addr.clone().recipient()))
}
addr
}
}
// subscribe to os signals
fn subscribe_to_signals(&self) -> Option<Addr<signal::ProcessSignals>> {
if !self.no_signals {
if let Some(ref signals) = self.signals {
Some(signals.clone())
} else {
Some(System::current().registry().get::<signal::ProcessSignals>())
}
} else {
None
}
}
fn start_worker(&self, idx: usize, notify: AcceptNotify) -> (Addr<Worker>, WorkerClient) {
let (tx, rx) = unbounded::<Conn>();
let conns = Connections::new(notify, self.maxconn, self.maxconnrate);
let worker = WorkerClient::new(idx, tx, conns.clone());
2018-08-22 02:08:23 +02:00
let services: Vec<Box<ServerServiceFactory<C> + Send>> =
2018-08-19 19:47:04 +02:00
self.services.iter().map(|v| v.clone_factory()).collect();
let addr = Arbiter::start(move |ctx: &mut Context<_>| {
ctx.add_message_stream(rx);
Worker::new(ctx, services)
});
(addr, worker)
}
}
2018-08-22 02:08:23 +02:00
impl<C: Config> Actor for Server<C> {
2018-08-19 19:47:04 +02:00
type Context = Context<Self>;
}
/// Signals support
/// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system
/// message to `System` actor.
2018-08-22 02:08:23 +02:00
impl<C: Config> Handler<signal::Signal> for Server<C> {
2018-08-19 19:47:04 +02:00
type Result = ();
fn handle(&mut self, msg: signal::Signal, ctx: &mut Context<Self>) {
match msg.0 {
signal::SignalType::Int => {
info!("SIGINT received, exiting");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer { graceful: false }, ctx);
}
signal::SignalType::Term => {
info!("SIGTERM received, stopping");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer { graceful: true }, ctx);
}
signal::SignalType::Quit => {
info!("SIGQUIT received, exiting");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer { graceful: false }, ctx);
}
_ => (),
}
}
}
2018-08-22 02:08:23 +02:00
impl<C: Config> Handler<PauseServer> for Server<C> {
2018-08-19 19:47:04 +02:00
type Result = ();
fn handle(&mut self, _: PauseServer, _: &mut Context<Self>) {
self.accept.send(Command::Pause);
}
}
2018-08-22 02:08:23 +02:00
impl<C: Config> Handler<ResumeServer> for Server<C> {
2018-08-19 19:47:04 +02:00
type Result = ();
fn handle(&mut self, _: ResumeServer, _: &mut Context<Self>) {
self.accept.send(Command::Resume);
}
}
2018-08-22 02:08:23 +02:00
impl<C: Config> Handler<StopServer> for Server<C> {
2018-08-19 19:47:04 +02:00
type Result = Response<(), ()>;
fn handle(&mut self, msg: StopServer, ctx: &mut Context<Self>) -> Self::Result {
// stop accept thread
self.accept.send(Command::Stop);
// stop workers
let (tx, rx) = mpsc::channel(1);
let dur = if msg.graceful {
Some(Duration::new(u64::from(self.shutdown_timeout), 0))
} else {
None
};
for worker in &self.workers {
let tx2 = tx.clone();
ctx.spawn(
worker
.1
.send(StopWorker { graceful: dur })
.into_actor(self)
.then(move |_, slf, ctx| {
slf.workers.pop();
if slf.workers.is_empty() {
let _ = tx2.send(());
// we need to stop system if server was spawned
if slf.exit {
ctx.run_later(Duration::from_millis(300), |_, _| {
System::current().stop();
});
}
}
fut::ok(())
}),
);
}
if !self.workers.is_empty() {
Response::async(rx.into_future().map(|_| ()).map_err(|_| ()))
} else {
// we need to stop system if server was spawned
if self.exit {
ctx.run_later(Duration::from_millis(300), |_, _| {
System::current().stop();
});
}
Response::reply(Ok(()))
}
}
}
/// Commands from accept threads
2018-08-22 02:08:23 +02:00
impl<C: Config> StreamHandler<ServerCommand, ()> for Server<C> {
2018-08-19 19:47:04 +02:00
fn finished(&mut self, _: &mut Context<Self>) {}
fn handle(&mut self, msg: ServerCommand, _: &mut Context<Self>) {
match msg {
ServerCommand::WorkerDied(idx) => {
let mut found = false;
for i in 0..self.workers.len() {
if self.workers[i].0 == idx {
self.workers.swap_remove(i);
found = true;
break;
}
}
if found {
error!("Worker has died {:?}, restarting", idx);
let mut new_idx = self.workers.len();
'found: loop {
for i in 0..self.workers.len() {
if self.workers[i].0 == new_idx {
new_idx += 1;
continue 'found;
}
}
break;
}
let (addr, worker) = self.start_worker(new_idx, self.accept.get_notify());
self.workers.push((new_idx, addr));
self.accept.send(Command::Worker(worker));
}
}
}
}
}
#[derive(Clone, Default)]
/// Contains information about connection.
pub struct Connections(Arc<ConnectionsInner>);
impl Connections {
fn new(notify: AcceptNotify, maxconn: usize, maxconnrate: usize) -> Self {
let maxconn_low = if maxconn > 10 { maxconn - 10 } else { 0 };
let maxconnrate_low = if maxconnrate > 10 {
maxconnrate - 10
} else {
0
};
Connections(Arc::new(ConnectionsInner {
notify,
maxconn,
maxconnrate,
maxconn_low,
maxconnrate_low,
conn: AtomicUsize::new(0),
connrate: AtomicUsize::new(0),
}))
}
pub(crate) fn available(&self) -> bool {
self.0.available()
}
/// Report opened connection
pub fn connection(&self) -> ConnectionTag {
ConnectionTag::new(self.0.clone())
}
/// Report rate connection, rate is usually ssl handshake
pub fn connection_rate(&self) -> ConnectionRateTag {
ConnectionRateTag::new(self.0.clone())
}
}
#[derive(Default)]
struct ConnectionsInner {
notify: AcceptNotify,
conn: AtomicUsize,
connrate: AtomicUsize,
maxconn: usize,
maxconnrate: usize,
maxconn_low: usize,
maxconnrate_low: usize,
}
impl ConnectionsInner {
fn available(&self) -> bool {
if self.maxconnrate <= self.connrate.load(Ordering::Relaxed) {
false
} else {
self.maxconn > self.conn.load(Ordering::Relaxed)
}
}
fn notify_maxconn(&self, maxconn: usize) {
if maxconn > self.maxconn_low && maxconn <= self.maxconn {
self.notify.notify();
}
}
fn notify_maxconnrate(&self, connrate: usize) {
if connrate > self.maxconnrate_low && connrate <= self.maxconnrate {
self.notify.notify();
}
}
}
/// Type responsible for max connection stat.
///
/// Max connections stat get updated on drop.
pub struct ConnectionTag(Arc<ConnectionsInner>);
impl ConnectionTag {
fn new(inner: Arc<ConnectionsInner>) -> Self {
inner.conn.fetch_add(1, Ordering::Relaxed);
ConnectionTag(inner)
}
}
impl Drop for ConnectionTag {
fn drop(&mut self) {
let conn = self.0.conn.fetch_sub(1, Ordering::Relaxed);
self.0.notify_maxconn(conn);
}
}
/// Type responsible for max connection rate stat.
///
/// Max connections rate stat get updated on drop.
pub struct ConnectionRateTag(Arc<ConnectionsInner>);
impl ConnectionRateTag {
fn new(inner: Arc<ConnectionsInner>) -> Self {
inner.connrate.fetch_add(1, Ordering::Relaxed);
ConnectionRateTag(inner)
}
}
impl Drop for ConnectionRateTag {
fn drop(&mut self) {
let connrate = self.0.connrate.fetch_sub(1, Ordering::Relaxed);
self.0.notify_maxconnrate(connrate);
}
}
fn bind_addr<S: net::ToSocketAddrs>(addr: S) -> io::Result<Vec<net::TcpListener>> {
let mut err = None;
let mut succ = false;
let mut sockets = Vec::new();
for addr in addr.to_socket_addrs()? {
match create_tcp_listener(addr) {
Ok(lst) => {
succ = true;
sockets.push(lst);
}
Err(e) => err = Some(e),
}
}
if !succ {
if let Some(e) = err.take() {
Err(e)
} else {
Err(io::Error::new(
io::ErrorKind::Other,
"Can not bind to address.",
))
}
} else {
Ok(sockets)
}
}
fn create_tcp_listener(addr: net::SocketAddr) -> io::Result<net::TcpListener> {
let builder = match addr {
net::SocketAddr::V4(_) => TcpBuilder::new_v4()?,
net::SocketAddr::V6(_) => TcpBuilder::new_v6()?,
};
builder.reuse_address(true)?;
builder.bind(addr)?;
Ok(builder.listen(1024)?)
}