1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-01-18 13:51:50 +01:00

refactor workers management

This commit is contained in:
Nikolay Kim 2018-08-09 11:52:32 -07:00
parent cfe4829a56
commit e4ce6dfbdf
9 changed files with 1061 additions and 840 deletions

View File

@ -9,8 +9,9 @@ use tokio_timer::Delay;
use actix::{msgs::Execute, Arbiter, System};
use super::srv::ServerCommand;
use super::worker::{Conn, Socket, Token, WorkerClient};
use super::server::ServerCommand;
use super::worker::{Conn, WorkerClient};
use super::Token;
pub(crate) enum Command {
Pause,
@ -22,51 +23,27 @@ pub(crate) enum Command {
struct ServerSocketInfo {
addr: net::SocketAddr,
token: Token,
handler: Token,
sock: mio::net::TcpListener,
timeout: Option<Instant>,
}
#[derive(Clone)]
pub(crate) struct AcceptNotify {
ready: mio::SetReadiness,
maxconn: usize,
maxconn_low: usize,
maxconnrate: usize,
maxconnrate_low: usize,
}
pub(crate) struct AcceptNotify(mio::SetReadiness);
impl AcceptNotify {
pub fn new(ready: mio::SetReadiness, maxconn: usize, maxconnrate: usize) -> Self {
let maxconn_low = if maxconn > 10 { maxconn - 10 } else { 0 };
let maxconnrate_low = if maxconnrate > 10 {
maxconnrate - 10
} else {
0
};
AcceptNotify {
ready,
maxconn,
maxconn_low,
maxconnrate,
maxconnrate_low,
}
pub(crate) fn new(ready: mio::SetReadiness) -> Self {
AcceptNotify(ready)
}
pub fn notify_maxconn(&self, maxconn: usize) {
if maxconn > self.maxconn_low && maxconn <= self.maxconn {
let _ = self.ready.set_readiness(mio::Ready::readable());
}
}
pub fn notify_maxconnrate(&self, connrate: usize) {
if connrate > self.maxconnrate_low && connrate <= self.maxconnrate {
let _ = self.ready.set_readiness(mio::Ready::readable());
}
pub(crate) fn notify(&self) {
let _ = self.0.set_readiness(mio::Ready::readable());
}
}
impl Default for AcceptNotify {
fn default() -> Self {
AcceptNotify::new(mio::Registration::new2().1, 0, 0)
AcceptNotify::new(mio::Registration::new2().1)
}
}
@ -81,8 +58,6 @@ pub(crate) struct AcceptLoop {
mpsc::UnboundedSender<ServerCommand>,
mpsc::UnboundedReceiver<ServerCommand>,
)>,
maxconn: usize,
maxconnrate: usize,
}
impl AcceptLoop {
@ -97,8 +72,6 @@ impl AcceptLoop {
cmd_reg: Some(cmd_reg),
notify_ready,
notify_reg: Some(notify_reg),
maxconn: 102_400,
maxconnrate: 256,
rx: Some(rx),
srv: Some(mpsc::unbounded()),
}
@ -110,19 +83,12 @@ impl AcceptLoop {
}
pub fn get_notify(&self) -> AcceptNotify {
AcceptNotify::new(self.notify_ready.clone(), self.maxconn, self.maxconnrate)
}
pub fn maxconn(&mut self, num: usize) {
self.maxconn = num;
}
pub fn maxconnrate(&mut self, num: usize) {
self.maxconnrate = num;
AcceptNotify::new(self.notify_ready.clone())
}
pub(crate) fn start(
&mut self, socks: Vec<Socket>, workers: Vec<WorkerClient>,
&mut self, socks: Vec<Vec<(Token, net::TcpListener)>>,
workers: Vec<WorkerClient>,
) -> mpsc::UnboundedReceiver<ServerCommand> {
let (tx, rx) = self.srv.take().expect("Can not re-use AcceptInfo");
@ -130,8 +96,6 @@ impl AcceptLoop {
self.rx.take().expect("Can not re-use AcceptInfo"),
self.cmd_reg.take().expect("Can not re-use AcceptInfo"),
self.notify_reg.take().expect("Can not re-use AcceptInfo"),
self.maxconn,
self.maxconnrate,
socks,
tx,
workers,
@ -148,8 +112,6 @@ struct Accept {
srv: mpsc::UnboundedSender<ServerCommand>,
timer: (mio::Registration, mio::SetReadiness),
next: usize,
maxconn: usize,
maxconnrate: usize,
backpressure: bool,
}
@ -175,9 +137,8 @@ impl Accept {
#![cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
pub(crate) fn start(
rx: sync_mpsc::Receiver<Command>, cmd_reg: mio::Registration,
notify_reg: mio::Registration, maxconn: usize, maxconnrate: usize,
socks: Vec<Socket>, srv: mpsc::UnboundedSender<ServerCommand>,
workers: Vec<WorkerClient>,
notify_reg: mio::Registration, socks: Vec<Vec<(Token, net::TcpListener)>>,
srv: mpsc::UnboundedSender<ServerCommand>, workers: Vec<WorkerClient>,
) {
let sys = System::current();
@ -187,8 +148,6 @@ impl Accept {
.spawn(move || {
System::set_current(sys);
let mut accept = Accept::new(rx, socks, workers, srv);
accept.maxconn = maxconn;
accept.maxconnrate = maxconnrate;
// Start listening for incoming commands
if let Err(err) = accept.poll.register(
@ -215,7 +174,7 @@ impl Accept {
}
fn new(
rx: sync_mpsc::Receiver<Command>, socks: Vec<Socket>,
rx: sync_mpsc::Receiver<Command>, socks: Vec<Vec<(Token, net::TcpListener)>>,
workers: Vec<WorkerClient>, srv: mpsc::UnboundedSender<ServerCommand>,
) -> Accept {
// Create a poll instance
@ -226,29 +185,33 @@ impl Accept {
// Start accept
let mut sockets = Slab::new();
for sock in socks {
let server = mio::net::TcpListener::from_std(sock.lst)
.expect("Can not create mio::net::TcpListener");
for (idx, srv_socks) in socks.into_iter().enumerate() {
for (hnd_token, lst) in srv_socks {
let addr = lst.local_addr().unwrap();
let server = mio::net::TcpListener::from_std(lst)
.expect("Can not create mio::net::TcpListener");
let entry = sockets.vacant_entry();
let token = entry.key();
let entry = sockets.vacant_entry();
let token = entry.key();
// Start listening for incoming connections
if let Err(err) = poll.register(
&server,
mio::Token(token + DELTA),
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
panic!("Can not register io: {}", err);
// Start listening for incoming connections
if let Err(err) = poll.register(
&server,
mio::Token(token + DELTA),
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
panic!("Can not register io: {}", err);
}
entry.insert(ServerSocketInfo {
addr,
token: hnd_token,
handler: Token(idx),
sock: server,
timeout: None,
});
}
entry.insert(ServerSocketInfo {
token: sock.token,
addr: sock.addr,
sock: server,
timeout: None,
});
}
// Timer
@ -267,8 +230,6 @@ impl Accept {
srv,
next: 0,
timer: (tm, tmr),
maxconn: 102_400,
maxconnrate: 256,
backpressure: false,
}
}
@ -431,7 +392,7 @@ impl Accept {
let mut idx = 0;
while idx < self.workers.len() {
idx += 1;
if self.workers[self.next].available(self.maxconn, self.maxconnrate) {
if self.workers[self.next].available() {
match self.workers[self.next].send(msg) {
Ok(_) => {
self.next = (self.next + 1) % self.workers.len();
@ -469,6 +430,7 @@ impl Accept {
Ok((io, addr)) => Conn {
io,
token: info.token,
handler: info.handler,
peer: Some(addr),
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
@ -489,11 +451,10 @@ impl Accept {
Delay::new(
Instant::now() + Duration::from_millis(510),
).map_err(|_| ())
.and_then(move |_| {
let _ =
r.set_readiness(mio::Ready::readable());
Ok(())
}),
.and_then(move |_| {
let _ = r.set_readiness(mio::Ready::readable());
Ok(())
}),
);
Ok(())
},

View File

@ -7,7 +7,7 @@ use futures::{Async, Future, Poll};
use tokio_io::{AsyncRead, AsyncWrite};
use super::settings::WorkerSettings;
use super::{h1, h2, HttpHandler, IoStream};
use super::{h1, h2, ConnectionTag, HttpHandler, IoStream};
const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0";
@ -30,6 +30,7 @@ where
{
proto: Option<HttpProtocol<T, H>>,
node: Option<Node<HttpChannel<T, H>>>,
_tag: ConnectionTag,
}
impl<T, H> HttpChannel<T, H>
@ -40,9 +41,10 @@ where
pub(crate) fn new(
settings: Rc<WorkerSettings<H>>, io: T, peer: Option<SocketAddr>,
) -> HttpChannel<T, H> {
settings.add_channel();
let _tag = settings.connection();
HttpChannel {
_tag,
node: None,
proto: Some(HttpProtocol::Unknown(
settings,
@ -97,7 +99,6 @@ where
let result = h1.poll();
match result {
Ok(Async::Ready(())) | Err(_) => {
h1.settings().remove_channel();
if let Some(n) = self.node.as_mut() {
n.remove()
};
@ -110,7 +111,6 @@ where
let result = h2.poll();
match result {
Ok(Async::Ready(())) | Err(_) => {
h2.settings().remove_channel();
if let Some(n) = self.node.as_mut() {
n.remove()
};
@ -119,16 +119,10 @@ where
}
return result;
}
Some(HttpProtocol::Unknown(
ref mut settings,
_,
ref mut io,
ref mut buf,
)) => {
Some(HttpProtocol::Unknown(_, _, ref mut io, ref mut buf)) => {
match io.read_available(buf) {
Ok(Async::Ready(true)) | Err(_) => {
debug!("Ignored premature client disconnection");
settings.remove_channel();
if let Some(n) = self.node.as_mut() {
n.remove()
};

View File

@ -468,7 +468,6 @@ where
#[cfg(test)]
mod tests {
use std::net::Shutdown;
use std::sync::{atomic::AtomicUsize, Arc};
use std::{cmp, io, time};
use bytes::{Buf, Bytes, BytesMut};
@ -478,20 +477,17 @@ mod tests {
use super::*;
use application::HttpApplication;
use httpmessage::HttpMessage;
use server::accept::AcceptNotify;
use server::h1decoder::Message;
use server::settings::{ServerSettings, WorkerSettings};
use server::{KeepAlive, Request};
use server::{Connections, KeepAlive, Request};
fn wrk_settings() -> WorkerSettings<HttpApplication> {
WorkerSettings::<HttpApplication>::new(
fn wrk_settings() -> Rc<WorkerSettings<HttpApplication>> {
Rc::new(WorkerSettings::<HttpApplication>::new(
Vec::new(),
KeepAlive::Os,
ServerSettings::default(),
AcceptNotify::default(),
Arc::new(AtomicUsize::new(0)),
Arc::new(AtomicUsize::new(0)),
)
Connections::default(),
))
}
impl Message {

View File

@ -21,12 +21,16 @@ pub(crate) mod helpers;
pub(crate) mod input;
pub(crate) mod message;
pub(crate) mod output;
mod server;
pub(crate) mod settings;
mod srv;
mod ssl;
mod worker;
pub use self::message::Request;
pub use self::server::{
ConnectionRateTag, ConnectionTag, Connections, Server, Service, ServiceHandler,
};
pub use self::settings::ServerSettings;
pub use self::srv::HttpServer;
pub use self::ssl::*;
@ -136,6 +140,16 @@ impl Message for StopServer {
type Result = Result<(), ()>;
}
/// Socket id token
#[derive(Clone, Copy)]
pub struct Token(usize);
impl Token {
pub(crate) fn new(val: usize) -> Token {
Token(val)
}
}
/// Low level http request handler
#[allow(unused_variables)]
pub trait HttpHandler: 'static {

504
src/server/server.rs Normal file
View File

@ -0,0 +1,504 @@
use std::{mem, net};
use std::time::Duration;
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
use futures::{Future, Stream, Sink};
use futures::sync::{mpsc, mpsc::unbounded};
use actix::{fut, signal, Actor, ActorFuture, Addr, Arbiter, AsyncContext,
Context, Handler, Response, System, StreamHandler, WrapFuture};
use super::accept::{AcceptLoop, AcceptNotify, Command};
use super::worker::{StopWorker, Worker, WorkerClient, Conn};
use super::{PauseServer, ResumeServer, StopServer, Token};
pub trait Service: Send + 'static {
/// Clone service
fn clone(&self) -> Box<Service>;
/// Create service handler for this service
fn create(&self, conn: Connections) -> Box<ServiceHandler>;
}
impl Service for Box<Service> {
fn clone(&self) -> Box<Service> {
self.as_ref().clone()
}
fn create(&self, conn: Connections) -> Box<ServiceHandler> {
self.as_ref().create(conn)
}
}
pub trait ServiceHandler {
/// Handle incoming stream
fn handle(&mut self, token: Token, io: net::TcpStream, peer: Option<net::SocketAddr>);
/// Shutdown open handlers
fn shutdown(&self, _: bool) {}
}
pub(crate) enum ServerCommand {
WorkerDied(usize),
}
pub struct Server {
threads: usize,
workers: Vec<(usize, Addr<Worker>)>,
services: Vec<Box<Service>>,
sockets: Vec<Vec<(Token, net::TcpListener)>>,
accept: AcceptLoop,
exit: bool,
shutdown_timeout: u16,
signals: Option<Addr<signal::ProcessSignals>>,
no_signals: bool,
maxconn: usize,
maxconnrate: usize,
}
impl Default for Server {
fn default() -> Self {
Self::new()
}
}
impl Server {
/// Create new Server instance
pub fn new() -> Server {
Server {
threads: num_cpus::get(),
workers: Vec::new(),
services: Vec::new(),
sockets: Vec::new(),
accept: AcceptLoop::new(),
exit: false,
shutdown_timeout: 30,
signals: None,
no_signals: false,
maxconn: 102_400,
maxconnrate: 256,
}
}
/// Set number of workers to start.
///
/// By default http server uses number of available logical cpu as threads
/// count.
pub fn workers(mut self, num: usize) -> Self {
self.threads = num;
self
}
/// Sets the maximum per-worker number of concurrent connections.
///
/// All socket listeners will stop accepting connections when this limit is reached
/// for each worker.
///
/// By default max connections is set to a 100k.
pub fn maxconn(mut self, num: usize) -> Self {
self.maxconn = num;
self
}
/// Sets the maximum per-worker concurrent connection establish process.
///
/// All listeners will stop accepting connections when this limit is reached. It
/// can be used to limit the global SSL CPU usage.
///
/// By default max connections is set to a 256.
pub fn maxconnrate(mut self, num: usize) -> Self {
self.maxconnrate= num;
self
}
/// Stop actix system.
///
/// `SystemExit` message stops currently running system.
pub fn system_exit(mut self) -> Self {
self.exit = true;
self
}
#[doc(hidden)]
/// Set alternative address for `ProcessSignals` actor.
pub fn signals(mut self, addr: Addr<signal::ProcessSignals>) -> Self {
self.signals = Some(addr);
self
}
/// Disable signal handling
pub fn disable_signals(mut self) -> Self {
self.no_signals = true;
self
}
/// Timeout for graceful workers shutdown.
///
/// After receiving a stop signal, workers have this much time to finish
/// serving requests. Workers still alive after the timeout are force
/// dropped.
///
/// By default shutdown timeout sets to 30 seconds.
pub fn shutdown_timeout(mut self, sec: u16) -> Self {
self.shutdown_timeout = sec;
self
}
/// Add new service to server
pub fn service<T>(mut self, srv: T, sockets: Vec<(Token, net::TcpListener)>) -> Self
where
T: Into<Box<Service>>
{
self.services.push(srv.into());
self.sockets.push(sockets);
self
}
/// Spawn new thread and start listening for incoming connections.
///
/// This method spawns new thread and starts new actix system. Other than
/// that it is similar to `start()` method. This method blocks.
///
/// This methods panics if no socket addresses get bound.
///
/// ```rust,ignore
/// # extern crate futures;
/// # extern crate actix_web;
/// # use futures::Future;
/// use actix_web::*;
///
/// fn main() {
/// Server::new().
/// .service(
/// HttpServer::new(|| App::new().resource("/", |r| r.h(|_| HttpResponse::Ok())))
/// .bind("127.0.0.1:0")
/// .expect("Can not bind to 127.0.0.1:0"))
/// .run();
/// }
/// ```
pub fn run(self) {
let sys = System::new("http-server");
self.start();
sys.run();
}
/// Start
pub fn start(mut self) -> Addr<Server> {
if self.sockets.is_empty() {
panic!("Service should have at least one bound socket");
} else {
info!("Starting {} http workers", self.threads);
// start workers
let mut workers = Vec::new();
for idx in 0..self.threads {
let (addr, worker) = self.start_worker(idx, self.accept.get_notify());
workers.push(worker);
self.workers.push((idx, addr));
}
// start accept thread
for sock in &self.sockets {
for s in sock.iter() {
info!("Starting server on http://{:?}", s.1.local_addr().ok());
}
}
let rx = self.accept.start(
mem::replace(&mut self.sockets, Vec::new()), workers);
// start http server actor
let signals = self.subscribe_to_signals();
let addr = Actor::create(move |ctx| {
ctx.add_stream(rx);
self
});
if let Some(signals) = signals {
signals.do_send(signal::Subscribe(addr.clone().recipient()))
}
addr
}
}
// subscribe to os signals
fn subscribe_to_signals(&self) -> Option<Addr<signal::ProcessSignals>> {
if !self.no_signals {
if let Some(ref signals) = self.signals {
Some(signals.clone())
} else {
Some(System::current().registry().get::<signal::ProcessSignals>())
}
} else {
None
}
}
fn start_worker(&self, idx: usize, notify: AcceptNotify) -> (Addr<Worker>, WorkerClient) {
let (tx, rx) = unbounded::<Conn<net::TcpStream>>();
let conns = Connections::new(notify, self.maxconn, self.maxconnrate);
let worker = WorkerClient::new(idx, tx, conns.clone());
let services: Vec<_> = self.services.iter().map(|v| v.clone()).collect();
let addr = Arbiter::start(move |ctx: &mut Context<_>| {
ctx.add_message_stream(rx);
let handlers: Vec<_> = services.into_iter().map(|s| s.create(conns.clone())).collect();
Worker::new(conns, handlers)
});
(addr, worker)
}
}
impl Actor for Server
{
type Context = Context<Self>;
}
/// Signals support
/// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system
/// message to `System` actor.
impl Handler<signal::Signal> for Server {
type Result = ();
fn handle(&mut self, msg: signal::Signal, ctx: &mut Context<Self>) {
match msg.0 {
signal::SignalType::Int => {
info!("SIGINT received, exiting");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer { graceful: false }, ctx);
}
signal::SignalType::Term => {
info!("SIGTERM received, stopping");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer { graceful: true }, ctx);
}
signal::SignalType::Quit => {
info!("SIGQUIT received, exiting");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer { graceful: false }, ctx);
}
_ => (),
}
}
}
impl Handler<PauseServer> for Server {
type Result = ();
fn handle(&mut self, _: PauseServer, _: &mut Context<Self>) {
self.accept.send(Command::Pause);
}
}
impl Handler<ResumeServer> for Server {
type Result = ();
fn handle(&mut self, _: ResumeServer, _: &mut Context<Self>) {
self.accept.send(Command::Resume);
}
}
impl Handler<StopServer> for Server {
type Result = Response<(), ()>;
fn handle(&mut self, msg: StopServer, ctx: &mut Context<Self>) -> Self::Result {
// stop accept thread
self.accept.send(Command::Stop);
// stop workers
let (tx, rx) = mpsc::channel(1);
let dur = if msg.graceful {
Some(Duration::new(u64::from(self.shutdown_timeout), 0))
} else {
None
};
for worker in &self.workers {
let tx2 = tx.clone();
ctx.spawn(
worker
.1
.send(StopWorker { graceful: dur })
.into_actor(self)
.then(move |_, slf, ctx| {
slf.workers.pop();
if slf.workers.is_empty() {
let _ = tx2.send(());
// we need to stop system if server was spawned
if slf.exit {
ctx.run_later(Duration::from_millis(300), |_, _| {
System::current().stop();
});
}
}
fut::ok(())
}),
);
}
if !self.workers.is_empty() {
Response::async(rx.into_future().map(|_| ()).map_err(|_| ()))
} else {
// we need to stop system if server was spawned
if self.exit {
ctx.run_later(Duration::from_millis(300), |_, _| {
System::current().stop();
});
}
Response::reply(Ok(()))
}
}
}
/// Commands from accept threads
impl StreamHandler<ServerCommand, ()> for Server {
fn finished(&mut self, _: &mut Context<Self>) {}
fn handle(&mut self, msg: ServerCommand, _: &mut Context<Self>) {
match msg {
ServerCommand::WorkerDied(idx) => {
let mut found = false;
for i in 0..self.workers.len() {
if self.workers[i].0 == idx {
self.workers.swap_remove(i);
found = true;
break;
}
}
if found {
error!("Worker has died {:?}, restarting", idx);
let mut new_idx = self.workers.len();
'found: loop {
for i in 0..self.workers.len() {
if self.workers[i].0 == new_idx {
new_idx += 1;
continue 'found;
}
}
break;
}
let (addr, worker) = self.start_worker(new_idx, self.accept.get_notify());
self.workers.push((new_idx, addr));
self.accept.send(Command::Worker(worker));
}
}
}
}
}
#[derive(Clone, Default)]
pub struct Connections (Arc<ConnectionsInner>);
impl Connections {
fn new(notify: AcceptNotify, maxconn: usize, maxconnrate: usize) -> Self {
let maxconn_low = if maxconn > 10 { maxconn - 10 } else { 0 };
let maxconnrate_low = if maxconnrate > 10 {
maxconnrate - 10
} else {
0
};
Connections (
Arc::new(ConnectionsInner {
notify,
maxconn, maxconnrate,
maxconn_low, maxconnrate_low,
conn: AtomicUsize::new(0),
connrate: AtomicUsize::new(0),
}))
}
pub(crate) fn available(&self) -> bool {
self.0.available()
}
pub(crate) fn num_connections(&self) -> usize {
self.0.conn.load(Ordering::Relaxed)
}
/// Report opened connection
pub fn connection(&self) -> ConnectionTag {
ConnectionTag::new(self.0.clone())
}
/// Report rate connection, rate is usually ssl handshake
pub fn connection_rate(&self) -> ConnectionRateTag {
ConnectionRateTag::new(self.0.clone())
}
}
#[derive(Default)]
struct ConnectionsInner {
notify: AcceptNotify,
conn: AtomicUsize,
connrate: AtomicUsize,
maxconn: usize,
maxconnrate: usize,
maxconn_low: usize,
maxconnrate_low: usize,
}
impl ConnectionsInner {
fn available(&self) -> bool {
if self.maxconnrate <= self.connrate.load(Ordering::Relaxed) {
false
} else {
self.maxconn > self.conn.load(Ordering::Relaxed)
}
}
fn notify_maxconn(&self, maxconn: usize) {
if maxconn > self.maxconn_low && maxconn <= self.maxconn {
self.notify.notify();
}
}
fn notify_maxconnrate(&self, connrate: usize) {
if connrate > self.maxconnrate_low && connrate <= self.maxconnrate {
self.notify.notify();
}
}
}
/// Type responsible for max connection stat.
///
/// Max connections stat get updated on drop.
pub struct ConnectionTag(Arc<ConnectionsInner>);
impl ConnectionTag {
fn new(inner: Arc<ConnectionsInner>) -> Self {
inner.conn.fetch_add(1, Ordering::Relaxed);
ConnectionTag(inner)
}
}
impl Drop for ConnectionTag {
fn drop(&mut self) {
let conn = self.0.conn.fetch_sub(1, Ordering::Relaxed);
self.0.notify_maxconn(conn);
}
}
/// Type responsible for max connection rate stat.
///
/// Max connections rate stat get updated on drop.
pub struct ConnectionRateTag (Arc<ConnectionsInner>);
impl ConnectionRateTag {
fn new(inner: Arc<ConnectionsInner>) -> Self {
inner.connrate.fetch_add(1, Ordering::Relaxed);
ConnectionRateTag(inner)
}
}
impl Drop for ConnectionRateTag {
fn drop(&mut self) {
let connrate = self.0.connrate.fetch_sub(1, Ordering::Relaxed);
self.0.notify_maxconnrate(connrate);
}
}

View File

@ -2,19 +2,22 @@ use std::cell::{RefCell, RefMut, UnsafeCell};
use std::collections::VecDeque;
use std::fmt::Write;
use std::rc::Rc;
use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc};
use std::time::{Duration, Instant};
use std::{env, fmt, net};
use actix::Arbiter;
use bytes::BytesMut;
use futures::Stream;
use futures_cpupool::CpuPool;
use http::StatusCode;
use lazycell::LazyCell;
use parking_lot::Mutex;
use time;
use tokio_timer::Interval;
use super::accept::AcceptNotify;
use super::channel::Node;
use super::message::{Request, RequestPool};
use super::server::{ConnectionRateTag, ConnectionTag, Connections};
use super::KeepAlive;
use body::Body;
use httpresponse::{HttpResponse, HttpResponseBuilder, HttpResponsePool};
@ -137,17 +140,36 @@ pub(crate) struct WorkerSettings<H> {
ka_enabled: bool,
bytes: Rc<SharedBytesPool>,
messages: &'static RequestPool,
channels: Arc<AtomicUsize>,
conns: Connections,
node: RefCell<Node<()>>,
date: UnsafeCell<Date>,
connrate: Arc<AtomicUsize>,
notify: AcceptNotify,
}
impl<H: 'static> WorkerSettings<H> {
pub(crate) fn create(
apps: Vec<H>, keep_alive: KeepAlive, settings: ServerSettings,
conns: Connections,
) -> Rc<WorkerSettings<H>> {
let settings = Rc::new(Self::new(apps, keep_alive, settings, conns));
// periodic date update
let s = settings.clone();
Arbiter::spawn(
Interval::new(Instant::now(), Duration::from_secs(1))
.map_err(|_| ())
.and_then(move |_| {
s.update_date();
Ok(())
}).fold((), |(), _| Ok(())),
);
settings
}
}
impl<H> WorkerSettings<H> {
pub(crate) fn new(
h: Vec<H>, keep_alive: KeepAlive, settings: ServerSettings,
notify: AcceptNotify, channels: Arc<AtomicUsize>, connrate: Arc<AtomicUsize>,
h: Vec<H>, keep_alive: KeepAlive, settings: ServerSettings, conns: Connections,
) -> WorkerSettings<H> {
let (keep_alive, ka_enabled) = match keep_alive {
KeepAlive::Timeout(val) => (val as u64, true),
@ -163,16 +185,10 @@ impl<H> WorkerSettings<H> {
date: UnsafeCell::new(Date::new()),
keep_alive,
ka_enabled,
channels,
connrate,
notify,
conns,
}
}
pub fn num_channels(&self) -> usize {
self.channels.load(Ordering::Relaxed)
}
pub fn head(&self) -> RefMut<Node<()>> {
self.node.borrow_mut()
}
@ -201,16 +217,11 @@ impl<H> WorkerSettings<H> {
RequestPool::get(self.messages)
}
pub fn add_channel(&self) {
self.channels.fetch_add(1, Ordering::Relaxed);
pub fn connection(&self) -> ConnectionTag {
self.conns.connection()
}
pub fn remove_channel(&self) {
let val = self.channels.fetch_sub(1, Ordering::Relaxed);
self.notify.notify_maxconn(val);
}
pub fn update_date(&self) {
fn update_date(&self) {
// Unsafe: WorkerSetting is !Sync and !Send
unsafe { &mut *self.date.get() }.update();
}
@ -230,13 +241,8 @@ impl<H> WorkerSettings<H> {
}
#[allow(dead_code)]
pub(crate) fn conn_rate_add(&self) {
self.connrate.fetch_add(1, Ordering::Relaxed);
}
#[allow(dead_code)]
pub(crate) fn conn_rate_del(&self) {
let val = self.connrate.fetch_sub(1, Ordering::Relaxed);
self.notify.notify_maxconnrate(val);
pub(crate) fn connection_rate(&self) -> ConnectionRateTag {
self.conns.connection_rate()
}
}
@ -309,9 +315,7 @@ mod tests {
Vec::new(),
KeepAlive::Os,
ServerSettings::default(),
AcceptNotify::default(),
Arc::new(AtomicUsize::new(0)),
Arc::new(AtomicUsize::new(0)),
Connections::default(),
);
let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
settings.set_date(&mut buf1, true);

View File

@ -1,16 +1,14 @@
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::{atomic::AtomicUsize, Arc};
use std::time::Duration;
use std::{io, net};
use std::sync::Arc;
use std::{io, mem, net, time};
use actix::{
fut, signal, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, Handler,
Response, StreamHandler, System, WrapFuture,
};
use actix::{Actor, Addr, Arbiter, AsyncContext, Context, Handler, System};
use futures::sync::mpsc;
use futures::{Future, Sink, Stream};
use futures::{Future, Stream};
use net2::{TcpBuilder, TcpStreamExt};
use num_cpus;
use tokio::executor::current_thread;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_tcp::TcpStream;
@ -23,39 +21,33 @@ use openssl::ssl::SslAcceptorBuilder;
#[cfg(feature = "rust-tls")]
use rustls::ServerConfig;
use super::accept::{AcceptLoop, AcceptNotify, Command};
use super::channel::{HttpChannel, WrapperStream};
use super::server::{Connections, Server, Service, ServiceHandler};
use super::settings::{ServerSettings, WorkerSettings};
use super::worker::{Conn, StopWorker, Token, Worker, WorkerClient, WorkerFactory};
use super::{AcceptorService, IntoHttpHandler, IoStream, KeepAlive};
use super::{PauseServer, ResumeServer, StopServer};
use super::worker::{Conn, Socket};
use super::{
AcceptorService, HttpHandler, IntoAsyncIo, IntoHttpHandler, IoStream, KeepAlive,
Token,
};
/// An HTTP Server
pub struct HttpServer<H>
where
H: IntoHttpHandler + 'static,
{
factory: Arc<Fn() -> Vec<H> + Send + Sync>,
host: Option<String>,
keep_alive: KeepAlive,
backlog: i32,
threads: usize,
factory: WorkerFactory<H>,
workers: Vec<(usize, Addr<Worker>)>,
accept: AcceptLoop,
exit: bool,
shutdown_timeout: u16,
signals: Option<Addr<signal::ProcessSignals>>,
no_http2: bool,
no_signals: bool,
settings: Option<Rc<WorkerSettings<H::Handler>>>,
}
pub(crate) enum ServerCommand {
WorkerDied(usize),
}
impl<H> Actor for HttpServer<H>
where
H: IntoHttpHandler,
{
type Context = Context<Self>;
maxconn: usize,
maxconnrate: usize,
sockets: Vec<Socket>,
handlers: Vec<Box<IoStreamHandler<H::Handler, net::TcpStream>>>,
}
impl<H> HttpServer<H>
@ -72,15 +64,19 @@ where
HttpServer {
threads: num_cpus::get(),
factory: WorkerFactory::new(f),
workers: Vec::new(),
accept: AcceptLoop::new(),
exit: false,
factory: Arc::new(f),
host: None,
backlog: 2048,
keep_alive: KeepAlive::Os,
shutdown_timeout: 30,
signals: None,
exit: true,
no_http2: false,
no_signals: false,
settings: None,
maxconn: 102_400,
maxconnrate: 256,
// settings: None,
sockets: Vec::new(),
handlers: Vec::new(),
}
}
@ -104,7 +100,7 @@ where
///
/// This method should be called before `bind()` method call.
pub fn backlog(mut self, num: i32) -> Self {
self.factory.backlog = num;
self.backlog = num;
self
}
@ -115,7 +111,7 @@ where
///
/// By default max connections is set to a 100k.
pub fn maxconn(mut self, num: usize) -> Self {
self.accept.maxconn(num);
self.maxconn = num;
self
}
@ -126,7 +122,7 @@ where
///
/// By default max connections is set to a 256.
pub fn maxconnrate(mut self, num: usize) -> Self {
self.accept.maxconnrate(num);
self.maxconnrate = num;
self
}
@ -134,7 +130,7 @@ where
///
/// By default keep alive is set to a `Os`.
pub fn keep_alive<T: Into<KeepAlive>>(mut self, val: T) -> Self {
self.factory.keep_alive = val.into();
self.keep_alive = val.into();
self
}
@ -144,7 +140,7 @@ where
/// generation. Check [ConnectionInfo](./dev/struct.ConnectionInfo.
/// html#method.host) documentation for more information.
pub fn server_hostname(mut self, val: String) -> Self {
self.factory.host = Some(val);
self.host = Some(val);
self
}
@ -156,12 +152,6 @@ where
self
}
/// Set alternative address for `ProcessSignals` actor.
pub fn signals(mut self, addr: Addr<signal::ProcessSignals>) -> Self {
self.signals = Some(addr);
self
}
/// Disable signal handling
pub fn disable_signals(mut self) -> Self {
self.no_signals = true;
@ -182,7 +172,10 @@ where
/// Disable `HTTP/2` support
#[doc(hidden)]
#[deprecated(since = "0.7.4", note = "please use acceptor service with proper ServerFlags parama")]
#[deprecated(
since = "0.7.4",
note = "please use acceptor service with proper ServerFlags parama"
)]
pub fn no_http2(mut self) -> Self {
self.no_http2 = true;
self
@ -190,7 +183,7 @@ where
/// Get addresses of bound sockets.
pub fn addrs(&self) -> Vec<net::SocketAddr> {
self.factory.addrs()
self.sockets.iter().map(|s| s.addr).collect()
}
/// Get addresses of bound sockets and the scheme for it.
@ -200,7 +193,10 @@ where
/// and the user should be presented with an enumeration of which
/// socket requires which protocol.
pub fn addrs_with_scheme(&self) -> Vec<(net::SocketAddr, &str)> {
self.factory.addrs_with_scheme()
self.handlers
.iter()
.map(|s| (s.addr(), s.scheme()))
.collect()
}
/// Use listener for accepting incoming connection requests
@ -208,19 +204,29 @@ where
/// HttpServer does not change any configuration for TcpListener,
/// it needs to be configured before passing it to listen() method.
pub fn listen(mut self, lst: net::TcpListener) -> Self {
self.factory.listen(lst);
let token = Token(self.handlers.len());
let addr = lst.local_addr().unwrap();
self.handlers
.push(Box::new(SimpleHandler::new(lst.local_addr().unwrap())));
self.sockets.push(Socket { lst, addr, token });
self
}
/// Use listener for accepting incoming connection requests
pub fn listen_with<A>(
mut self, lst: net::TcpListener, acceptor: A,
) -> io::Result<Self>
pub fn listen_with<A>(mut self, lst: net::TcpListener, acceptor: A) -> Self
where
A: AcceptorService<TcpStream> + Send + 'static,
{
self.factory.listen_with(lst, acceptor);
Ok(self)
let token = Token(self.handlers.len());
let addr = lst.local_addr().unwrap();
self.handlers.push(Box::new(StreamHandler::new(
lst.local_addr().unwrap(),
acceptor,
)));
self.sockets.push(Socket { lst, addr, token });
self
}
#[cfg(feature = "tls")]
@ -233,12 +239,10 @@ where
///
/// HttpServer does not change any configuration for TcpListener,
/// it needs to be configured before passing it to listen() method.
pub fn listen_tls(
self, lst: net::TcpListener, acceptor: TlsAcceptor,
) -> io::Result<Self> {
pub fn listen_tls(self, lst: net::TcpListener, acceptor: TlsAcceptor) -> Self {
use super::NativeTlsAcceptor;
self.listen_with(lst, NativeTlsAcceptor::new(acceptor))
Ok(self.listen_with(lst, NativeTlsAcceptor::new(acceptor)))
}
#[cfg(feature = "alpn")]
@ -262,7 +266,7 @@ where
ServerFlags::HTTP1 | ServerFlags::HTTP2
};
self.listen_with(lst, OpensslAcceptor::with_flags(builder, flags)?)
Ok(self.listen_with(lst, OpensslAcceptor::with_flags(builder, flags)?))
}
#[cfg(feature = "rust-tls")]
@ -274,9 +278,7 @@ where
/// Use listener for accepting incoming tls connection requests
///
/// This method sets alpn protocols to "h2" and "http/1.1"
pub fn listen_rustls(
self, lst: net::TcpListener, builder: ServerConfig,
) -> io::Result<Self> {
pub fn listen_rustls(self, lst: net::TcpListener, builder: ServerConfig) -> Self {
use super::{RustlsAcceptor, ServerFlags};
// alpn support
@ -293,7 +295,16 @@ where
///
/// To bind multiple addresses this method can be called multiple times.
pub fn bind<S: net::ToSocketAddrs>(mut self, addr: S) -> io::Result<Self> {
self.factory.bind(addr)?;
let sockets = self.bind2(addr)?;
for lst in sockets {
let token = Token(self.handlers.len());
let addr = lst.local_addr().unwrap();
self.handlers
.push(Box::new(SimpleHandler::new(lst.local_addr().unwrap())));
self.sockets.push(Socket { lst, addr, token })
}
Ok(self)
}
@ -304,10 +315,51 @@ where
S: net::ToSocketAddrs,
A: AcceptorService<TcpStream> + Send + 'static,
{
self.factory.bind_with(addr, &acceptor)?;
let sockets = self.bind2(addr)?;
for lst in sockets {
let token = Token(self.handlers.len());
let addr = lst.local_addr().unwrap();
self.handlers.push(Box::new(StreamHandler::new(
lst.local_addr().unwrap(),
acceptor.clone(),
)));
self.sockets.push(Socket { lst, addr, token })
}
Ok(self)
}
fn bind2<S: net::ToSocketAddrs>(
&self, addr: S,
) -> io::Result<Vec<net::TcpListener>> {
let mut err = None;
let mut succ = false;
let mut sockets = Vec::new();
for addr in addr.to_socket_addrs()? {
match create_tcp_listener(addr, self.backlog) {
Ok(lst) => {
succ = true;
sockets.push(lst);
}
Err(e) => err = Some(e),
}
}
if !succ {
if let Some(e) = err.take() {
Err(e)
} else {
Err(io::Error::new(
io::ErrorKind::Other,
"Can not bind to address.",
))
}
} else {
Ok(sockets)
}
}
#[cfg(feature = "tls")]
#[doc(hidden)]
#[deprecated(
@ -373,37 +425,59 @@ where
self.bind_with(addr, RustlsAcceptor::with_flags(builder, flags))
}
}
fn start_workers(&mut self, notify: &AcceptNotify) -> Vec<WorkerClient> {
// start workers
let mut workers = Vec::new();
for idx in 0..self.threads {
let (worker, addr) = self.factory.start(idx, notify.clone());
workers.push(worker);
self.workers.push((idx, addr));
}
info!("Starting {} http workers", self.threads);
workers
impl<H: IntoHttpHandler> Into<Box<Service>> for HttpServer<H> {
fn into(self) -> Box<Service> {
Box::new(HttpService {
factory: self.factory,
host: self.host,
keep_alive: self.keep_alive,
handlers: self.handlers,
})
}
}
struct HttpService<H: IntoHttpHandler> {
factory: Arc<Fn() -> Vec<H> + Send + Sync>,
host: Option<String>,
keep_alive: KeepAlive,
handlers: Vec<Box<IoStreamHandler<H::Handler, net::TcpStream>>>,
}
impl<H: IntoHttpHandler + 'static> Service for HttpService<H> {
fn clone(&self) -> Box<Service> {
Box::new(HttpService {
factory: self.factory.clone(),
host: self.host.clone(),
keep_alive: self.keep_alive,
handlers: self.handlers.iter().map(|v| v.clone()).collect(),
})
}
// subscribe to os signals
fn subscribe_to_signals(&self) -> Option<Addr<signal::ProcessSignals>> {
if !self.no_signals {
if let Some(ref signals) = self.signals {
Some(signals.clone())
} else {
Some(System::current().registry().get::<signal::ProcessSignals>())
}
} else {
None
}
fn create(&self, conns: Connections) -> Box<ServiceHandler> {
let addr = self.handlers[0].addr();
let s = ServerSettings::new(Some(addr), &self.host, false);
let apps: Vec<_> = (*self.factory)()
.into_iter()
.map(|h| h.into_handler())
.collect();
let handlers = self.handlers.iter().map(|h| h.clone()).collect();
Box::new(HttpServiceHandler::new(
apps,
handlers,
self.keep_alive,
s,
conns,
))
}
}
impl<H: IntoHttpHandler> HttpServer<H> {
/// Start listening for incoming connections.
///
/// This method starts number of http handler workers in separate threads.
/// This method starts number of http workers in separate threads.
/// For each address this method starts separate thread which does
/// `accept()` in a loop.
///
@ -426,31 +500,25 @@ impl<H: IntoHttpHandler> HttpServer<H> {
/// sys.run(); // <- Run actix system, this method starts all async processes
/// }
/// ```
pub fn start(mut self) -> Addr<Self> {
let sockets = self.factory.take_sockets();
if sockets.is_empty() {
panic!("HttpServer::bind() has to be called before start()");
pub fn start(mut self) -> Addr<Server> {
let mut srv = Server::new()
.workers(self.threads)
.maxconn(self.maxconn)
.maxconnrate(self.maxconnrate)
.shutdown_timeout(self.shutdown_timeout);
srv = if self.exit { srv.system_exit() } else { srv };
srv = if self.no_signals {
srv.disable_signals()
} else {
let notify = self.accept.get_notify();
let workers = self.start_workers(&notify);
srv
};
// start accept thread
for sock in &sockets {
info!("Starting server on http://{}", sock.addr);
}
let rx = self.accept.start(sockets, workers.clone());
// start http server actor
let signals = self.subscribe_to_signals();
let addr = Actor::create(move |ctx| {
ctx.add_stream(rx);
self
});
if let Some(signals) = signals {
signals.do_send(signal::Subscribe(addr.clone().recipient()))
}
addr
}
let sockets: Vec<_> = mem::replace(&mut self.sockets, Vec::new())
.into_iter()
.map(|item| (item.token, item.lst))
.collect();
srv.service(self, sockets).start()
}
/// Spawn new thread and start listening for incoming connections.
@ -484,195 +552,279 @@ impl<H: IntoHttpHandler> HttpServer<H> {
/// Start listening for incoming connections from a stream.
///
/// This method uses only one thread for handling incoming connections.
pub fn start_incoming<T, S>(mut self, stream: S, secure: bool) -> Addr<Self>
pub fn start_incoming<T, S>(self, stream: S, secure: bool)
where
S: Stream<Item = T, Error = io::Error> + Send + 'static,
T: AsyncRead + AsyncWrite + Send + 'static,
{
// set server settings
let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap();
let settings = ServerSettings::new(Some(addr), &self.factory.host, secure);
let apps: Vec<_> = (*self.factory.factory)()
let srv_settings = ServerSettings::new(Some(addr), &self.host, secure);
let apps: Vec<_> = (*self.factory)()
.into_iter()
.map(|h| h.into_handler())
.collect();
self.settings = Some(Rc::new(WorkerSettings::new(
let settings = WorkerSettings::create(
apps,
self.factory.keep_alive,
settings,
AcceptNotify::default(),
Arc::new(AtomicUsize::new(0)),
Arc::new(AtomicUsize::new(0)),
)));
self.keep_alive,
srv_settings,
Connections::default(),
);
// start server
let signals = self.subscribe_to_signals();
let addr = HttpServer::create(move |ctx| {
HttpIncoming::create(move |ctx| {
ctx.add_message_stream(stream.map_err(|_| ()).map(move |t| Conn {
io: WrapperStream::new(t),
handler: Token::new(0),
token: Token::new(0),
peer: None,
}));
self
HttpIncoming { settings }
});
if let Some(signals) = signals {
signals.do_send(signal::Subscribe(addr.clone().recipient()))
}
addr
}
}
/// Signals support
/// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system
/// message to `System` actor.
impl<H: IntoHttpHandler> Handler<signal::Signal> for HttpServer<H> {
type Result = ();
fn handle(&mut self, msg: signal::Signal, ctx: &mut Context<Self>) {
match msg.0 {
signal::SignalType::Int => {
info!("SIGINT received, exiting");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer { graceful: false }, ctx);
}
signal::SignalType::Term => {
info!("SIGTERM received, stopping");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer { graceful: true }, ctx);
}
signal::SignalType::Quit => {
info!("SIGQUIT received, exiting");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer { graceful: false }, ctx);
}
_ => (),
}
}
struct HttpIncoming<H: HttpHandler> {
settings: Rc<WorkerSettings<H>>,
}
/// Commands from accept threads
impl<H: IntoHttpHandler> StreamHandler<ServerCommand, ()> for HttpServer<H> {
fn finished(&mut self, _: &mut Context<Self>) {}
fn handle(&mut self, msg: ServerCommand, _: &mut Context<Self>) {
match msg {
ServerCommand::WorkerDied(idx) => {
let mut found = false;
for i in 0..self.workers.len() {
if self.workers[i].0 == idx {
self.workers.swap_remove(i);
found = true;
break;
}
}
if found {
error!("Worker has died {:?}, restarting", idx);
let mut new_idx = self.workers.len();
'found: loop {
for i in 0..self.workers.len() {
if self.workers[i].0 == new_idx {
new_idx += 1;
continue 'found;
}
}
break;
}
let (worker, addr) =
self.factory.start(new_idx, self.accept.get_notify());
self.workers.push((new_idx, addr));
self.accept.send(Command::Worker(worker));
}
}
}
}
impl<H> Actor for HttpIncoming<H>
where
H: HttpHandler,
{
type Context = Context<Self>;
}
impl<T, H> Handler<Conn<T>> for HttpServer<H>
impl<T, H> Handler<Conn<T>> for HttpIncoming<H>
where
T: IoStream,
H: IntoHttpHandler,
H: HttpHandler,
{
type Result = ();
fn handle(&mut self, msg: Conn<T>, _: &mut Context<Self>) -> Self::Result {
Arbiter::spawn(HttpChannel::new(
Rc::clone(self.settings.as_ref().unwrap()),
Rc::clone(&self.settings),
msg.io,
msg.peer,
));
}
}
impl<H: IntoHttpHandler> Handler<PauseServer> for HttpServer<H> {
type Result = ();
fn handle(&mut self, _: PauseServer, _: &mut Context<Self>) {
self.accept.send(Command::Pause);
}
struct HttpServiceHandler<H>
where
H: HttpHandler + 'static,
{
settings: Rc<WorkerSettings<H>>,
handlers: Vec<Box<IoStreamHandler<H, net::TcpStream>>>,
tcp_ka: Option<time::Duration>,
}
impl<H: IntoHttpHandler> Handler<ResumeServer> for HttpServer<H> {
type Result = ();
fn handle(&mut self, _: ResumeServer, _: &mut Context<Self>) {
self.accept.send(Command::Resume);
}
}
impl<H: IntoHttpHandler> Handler<StopServer> for HttpServer<H> {
type Result = Response<(), ()>;
fn handle(&mut self, msg: StopServer, ctx: &mut Context<Self>) -> Self::Result {
// stop accept threads
self.accept.send(Command::Stop);
// stop workers
let (tx, rx) = mpsc::channel(1);
let dur = if msg.graceful {
Some(Duration::new(u64::from(self.shutdown_timeout), 0))
impl<H: HttpHandler + 'static> HttpServiceHandler<H> {
fn new(
apps: Vec<H>, handlers: Vec<Box<IoStreamHandler<H, net::TcpStream>>>,
keep_alive: KeepAlive, settings: ServerSettings, conns: Connections,
) -> HttpServiceHandler<H> {
let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive {
Some(time::Duration::new(val as u64, 0))
} else {
None
};
for worker in &self.workers {
let tx2 = tx.clone();
ctx.spawn(
worker
.1
.send(StopWorker { graceful: dur })
.into_actor(self)
.then(move |_, slf, ctx| {
slf.workers.pop();
if slf.workers.is_empty() {
let _ = tx2.send(());
let settings = WorkerSettings::create(apps, keep_alive, settings, conns);
// we need to stop system if server was spawned
if slf.exit {
ctx.run_later(Duration::from_millis(300), |_, _| {
System::current().stop();
});
}
}
fut::ok(())
}),
);
}
if !self.workers.is_empty() {
Response::async(rx.into_future().map(|_| ()).map_err(|_| ()))
} else {
// we need to stop system if server was spawned
if self.exit {
ctx.run_later(Duration::from_millis(300), |_, _| {
System::current().stop();
});
}
Response::reply(Ok(()))
HttpServiceHandler {
handlers,
tcp_ka,
settings,
}
}
}
}
impl<H> ServiceHandler for HttpServiceHandler<H>
where
H: HttpHandler + 'static,
{
fn handle(
&mut self, token: Token, io: net::TcpStream, peer: Option<net::SocketAddr>,
) {
if self.tcp_ka.is_some() && io.set_keepalive(self.tcp_ka).is_err() {
error!("Can not set socket keep-alive option");
}
self.handlers[token.0].handle(Rc::clone(&self.settings), io, peer);
}
fn shutdown(&self, force: bool) {
if force {
self.settings.head().traverse::<TcpStream, H>();
}
}
}
struct SimpleHandler<Io> {
addr: net::SocketAddr,
io: PhantomData<Io>,
}
impl<Io: IntoAsyncIo> Clone for SimpleHandler<Io> {
fn clone(&self) -> Self {
SimpleHandler {
addr: self.addr,
io: PhantomData,
}
}
}
impl<Io: IntoAsyncIo> SimpleHandler<Io> {
fn new(addr: net::SocketAddr) -> Self {
SimpleHandler {
addr,
io: PhantomData,
}
}
}
impl<H, Io> IoStreamHandler<H, Io> for SimpleHandler<Io>
where
H: HttpHandler,
Io: IntoAsyncIo + Send + 'static,
Io::Io: IoStream,
{
fn addr(&self) -> net::SocketAddr {
self.addr
}
fn clone(&self) -> Box<IoStreamHandler<H, Io>> {
Box::new(Clone::clone(self))
}
fn scheme(&self) -> &'static str {
"http"
}
fn handle(&self, h: Rc<WorkerSettings<H>>, io: Io, peer: Option<net::SocketAddr>) {
let mut io = match io.into_async_io() {
Ok(io) => io,
Err(err) => {
trace!("Failed to create async io: {}", err);
return;
}
};
let _ = io.set_nodelay(true);
current_thread::spawn(HttpChannel::new(h, io, peer));
}
}
struct StreamHandler<A, Io> {
acceptor: A,
addr: net::SocketAddr,
io: PhantomData<Io>,
}
impl<Io: IntoAsyncIo, A: AcceptorService<Io::Io>> StreamHandler<A, Io> {
fn new(addr: net::SocketAddr, acceptor: A) -> Self {
StreamHandler {
addr,
acceptor,
io: PhantomData,
}
}
}
impl<Io: IntoAsyncIo, A: AcceptorService<Io::Io>> Clone for StreamHandler<A, Io> {
fn clone(&self) -> Self {
StreamHandler {
addr: self.addr,
acceptor: self.acceptor.clone(),
io: PhantomData,
}
}
}
impl<H, Io, A> IoStreamHandler<H, Io> for StreamHandler<A, Io>
where
H: HttpHandler,
Io: IntoAsyncIo + Send + 'static,
Io::Io: IoStream,
A: AcceptorService<Io::Io> + Send + 'static,
{
fn addr(&self) -> net::SocketAddr {
self.addr
}
fn clone(&self) -> Box<IoStreamHandler<H, Io>> {
Box::new(Clone::clone(self))
}
fn scheme(&self) -> &'static str {
self.acceptor.scheme()
}
fn handle(&self, h: Rc<WorkerSettings<H>>, io: Io, peer: Option<net::SocketAddr>) {
let mut io = match io.into_async_io() {
Ok(io) => io,
Err(err) => {
trace!("Failed to create async io: {}", err);
return;
}
};
let _ = io.set_nodelay(true);
let rate = h.connection_rate();
current_thread::spawn(self.acceptor.accept(io).then(move |res| {
drop(rate);
match res {
Ok(io) => current_thread::spawn(HttpChannel::new(h, io, peer)),
Err(err) => trace!("Can not establish connection: {}", err),
}
Ok(())
}))
}
}
impl<H, Io: 'static> IoStreamHandler<H, Io> for Box<IoStreamHandler<H, Io>>
where
H: HttpHandler,
Io: IntoAsyncIo,
{
fn addr(&self) -> net::SocketAddr {
self.as_ref().addr()
}
fn clone(&self) -> Box<IoStreamHandler<H, Io>> {
self.as_ref().clone()
}
fn scheme(&self) -> &'static str {
self.as_ref().scheme()
}
fn handle(&self, h: Rc<WorkerSettings<H>>, io: Io, peer: Option<net::SocketAddr>) {
self.as_ref().handle(h, io, peer)
}
}
trait IoStreamHandler<H, Io>: Send
where
H: HttpHandler,
{
fn clone(&self) -> Box<IoStreamHandler<H, Io>>;
fn addr(&self) -> net::SocketAddr;
fn scheme(&self) -> &'static str;
fn handle(&self, h: Rc<WorkerSettings<H>>, io: Io, peer: Option<net::SocketAddr>);
}
fn create_tcp_listener(
addr: net::SocketAddr, backlog: i32,
) -> io::Result<net::TcpListener> {
let builder = match addr {
net::SocketAddr::V4(_) => TcpBuilder::new_v4()?,
net::SocketAddr::V6(_) => TcpBuilder::new_v6()?,
};
builder.reuse_address(true)?;
builder.bind(addr)?;
Ok(builder.listen(backlog)?)
}

View File

@ -1,216 +1,41 @@
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc};
use std::{io, mem, net, time};
use std::{net, time};
use futures::sync::mpsc::{unbounded, SendError, UnboundedSender};
use futures::sync::mpsc::{SendError, UnboundedSender};
use futures::sync::oneshot;
use futures::Future;
use net2::{TcpBuilder, TcpStreamExt};
use tokio::executor::current_thread;
use tokio_tcp::TcpStream;
use actix::msgs::StopArbiter;
use actix::{Actor, Addr, Arbiter, AsyncContext, Context, Handler, Message, Response};
use actix::{Actor, Arbiter, AsyncContext, Context, Handler, Message, Response};
use super::accept::AcceptNotify;
use super::channel::HttpChannel;
use super::settings::{ServerSettings, WorkerSettings};
use super::{
AcceptorService, HttpHandler, IntoAsyncIo, IntoHttpHandler, IoStream, KeepAlive,
};
use super::server::{Connections, ServiceHandler};
use super::Token;
#[derive(Message)]
pub(crate) struct Conn<T> {
pub io: T,
pub handler: Token,
pub token: Token,
pub peer: Option<net::SocketAddr>,
}
#[derive(Clone, Copy)]
pub struct Token(usize);
impl Token {
pub(crate) fn new(val: usize) -> Token {
Token(val)
}
}
pub(crate) struct Socket {
pub lst: net::TcpListener,
pub addr: net::SocketAddr,
pub token: Token,
}
pub(crate) struct WorkerFactory<H: IntoHttpHandler + 'static> {
pub factory: Arc<Fn() -> Vec<H> + Send + Sync>,
pub host: Option<String>,
pub keep_alive: KeepAlive,
pub backlog: i32,
sockets: Vec<Socket>,
handlers: Vec<Box<IoStreamHandler<H::Handler, net::TcpStream>>>,
}
impl<H: IntoHttpHandler + 'static> WorkerFactory<H> {
pub fn new<F>(factory: F) -> Self
where
F: Fn() -> Vec<H> + Send + Sync + 'static,
{
WorkerFactory {
factory: Arc::new(factory),
host: None,
backlog: 2048,
keep_alive: KeepAlive::Os,
sockets: Vec::new(),
handlers: Vec::new(),
}
}
pub fn addrs(&self) -> Vec<net::SocketAddr> {
self.sockets.iter().map(|s| s.addr).collect()
}
pub fn addrs_with_scheme(&self) -> Vec<(net::SocketAddr, &str)> {
self.handlers
.iter()
.map(|s| (s.addr(), s.scheme()))
.collect()
}
pub fn take_sockets(&mut self) -> Vec<Socket> {
mem::replace(&mut self.sockets, Vec::new())
}
pub fn listen(&mut self, lst: net::TcpListener) {
let token = Token(self.handlers.len());
let addr = lst.local_addr().unwrap();
self.handlers
.push(Box::new(SimpleHandler::new(lst.local_addr().unwrap())));
self.sockets.push(Socket { lst, addr, token })
}
pub fn listen_with<A>(&mut self, lst: net::TcpListener, acceptor: A)
where
A: AcceptorService<TcpStream> + Send + 'static,
{
let token = Token(self.handlers.len());
let addr = lst.local_addr().unwrap();
self.handlers.push(Box::new(StreamHandler::new(
lst.local_addr().unwrap(),
acceptor,
)));
self.sockets.push(Socket { lst, addr, token })
}
pub fn bind<S>(&mut self, addr: S) -> io::Result<()>
where
S: net::ToSocketAddrs,
{
let sockets = self.bind2(addr)?;
for lst in sockets {
let token = Token(self.handlers.len());
let addr = lst.local_addr().unwrap();
self.handlers
.push(Box::new(SimpleHandler::new(lst.local_addr().unwrap())));
self.sockets.push(Socket { lst, addr, token })
}
Ok(())
}
pub fn bind_with<S, A>(&mut self, addr: S, acceptor: &A) -> io::Result<()>
where
S: net::ToSocketAddrs,
A: AcceptorService<TcpStream> + Send + 'static,
{
let sockets = self.bind2(addr)?;
for lst in sockets {
let token = Token(self.handlers.len());
let addr = lst.local_addr().unwrap();
self.handlers.push(Box::new(StreamHandler::new(
lst.local_addr().unwrap(),
acceptor.clone(),
)));
self.sockets.push(Socket { lst, addr, token })
}
Ok(())
}
fn bind2<S: net::ToSocketAddrs>(
&self, addr: S,
) -> io::Result<Vec<net::TcpListener>> {
let mut err = None;
let mut succ = false;
let mut sockets = Vec::new();
for addr in addr.to_socket_addrs()? {
match create_tcp_listener(addr, self.backlog) {
Ok(lst) => {
succ = true;
sockets.push(lst);
}
Err(e) => err = Some(e),
}
}
if !succ {
if let Some(e) = err.take() {
Err(e)
} else {
Err(io::Error::new(
io::ErrorKind::Other,
"Can not bind to address.",
))
}
} else {
Ok(sockets)
}
}
pub fn start(
&mut self, idx: usize, notify: AcceptNotify,
) -> (WorkerClient, Addr<Worker>) {
let host = self.host.clone();
let addr = self.handlers[0].addr();
let factory = Arc::clone(&self.factory);
let ka = self.keep_alive;
let (tx, rx) = unbounded::<Conn<net::TcpStream>>();
let client = WorkerClient::new(idx, tx);
let conn = client.conn.clone();
let sslrate = client.sslrate.clone();
let handlers: Vec<_> = self.handlers.iter().map(|v| v.clone()).collect();
let addr = Arbiter::start(move |ctx: &mut Context<_>| {
let s = ServerSettings::new(Some(addr), &host, false);
let apps: Vec<_> =
(*factory)().into_iter().map(|h| h.into_handler()).collect();
ctx.add_message_stream(rx);
let inner = WorkerInner::new(apps, handlers, ka, s, conn, sslrate, notify);
Worker {
inner: Box::new(inner),
}
});
(client, addr)
}
}
#[derive(Clone)]
pub(crate) struct WorkerClient {
pub idx: usize,
tx: UnboundedSender<Conn<net::TcpStream>>,
pub conn: Arc<AtomicUsize>,
pub sslrate: Arc<AtomicUsize>,
conns: Connections,
}
impl WorkerClient {
fn new(idx: usize, tx: UnboundedSender<Conn<net::TcpStream>>) -> Self {
WorkerClient {
idx,
tx,
conn: Arc::new(AtomicUsize::new(0)),
sslrate: Arc::new(AtomicUsize::new(0)),
}
pub fn new(
idx: usize, tx: UnboundedSender<Conn<net::TcpStream>>, conns: Connections,
) -> Self {
WorkerClient { idx, tx, conns }
}
pub fn send(
@ -219,12 +44,8 @@ impl WorkerClient {
self.tx.unbounded_send(msg)
}
pub fn available(&self, maxconn: usize, maxsslrate: usize) -> bool {
if maxsslrate <= self.sslrate.load(Ordering::Relaxed) {
false
} else {
maxconn > self.conn.load(Ordering::Relaxed)
}
pub fn available(&self) -> bool {
self.conns.available()
}
}
@ -243,21 +64,21 @@ impl Message for StopWorker {
/// Worker accepts Socket objects via unbounded channel and start requests
/// processing.
pub(crate) struct Worker {
inner: Box<WorkerHandler>,
conns: Connections,
handlers: Vec<Box<ServiceHandler>>,
}
impl Actor for Worker {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.update_date(ctx);
}
}
impl Worker {
fn update_date(&self, ctx: &mut Context<Self>) {
self.inner.update_date();
ctx.run_later(time::Duration::new(1, 0), |slf, ctx| slf.update_date(ctx));
pub(crate) fn new(conns: Connections, handlers: Vec<Box<ServiceHandler>>) -> Self {
Worker { conns, handlers }
}
fn shutdown(&self, force: bool) {
self.handlers.iter().for_each(|h| h.shutdown(force));
}
fn shutdown_timeout(
@ -265,7 +86,7 @@ impl Worker {
) {
// sleep for 1 second and then check again
ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| {
let num = slf.inner.num_channels();
let num = slf.conns.num_connections();
if num == 0 {
let _ = tx.send(true);
Arbiter::current().do_send(StopArbiter(0));
@ -273,7 +94,7 @@ impl Worker {
slf.shutdown_timeout(ctx, tx, d);
} else {
info!("Force shutdown http worker, {} connections", num);
slf.inner.force_shutdown();
slf.shutdown(true);
let _ = tx.send(false);
Arbiter::current().do_send(StopArbiter(0));
}
@ -285,7 +106,7 @@ impl Handler<Conn<net::TcpStream>> for Worker {
type Result = ();
fn handle(&mut self, msg: Conn<net::TcpStream>, _: &mut Context<Self>) {
self.inner.handle_connect(msg)
self.handlers[msg.handler.0].handle(msg.token, msg.io, msg.peer)
}
}
@ -294,253 +115,25 @@ impl Handler<StopWorker> for Worker {
type Result = Response<bool, ()>;
fn handle(&mut self, msg: StopWorker, ctx: &mut Context<Self>) -> Self::Result {
let num = self.inner.num_channels();
let num = self.conns.num_connections();
if num == 0 {
info!("Shutting down http worker, 0 connections");
Response::reply(Ok(true))
} else if let Some(dur) = msg.graceful {
info!("Graceful http worker shutdown, {} connections", num);
self.shutdown(false);
let (tx, rx) = oneshot::channel();
self.shutdown_timeout(ctx, tx, dur);
Response::async(rx.map_err(|_| ()))
let num = self.conns.num_connections();
if num != 0 {
info!("Graceful http worker shutdown, {} connections", num);
self.shutdown_timeout(ctx, tx, dur);
Response::reply(Ok(true))
} else {
Response::async(rx.map_err(|_| ()))
}
} else {
info!("Force shutdown http worker, {} connections", num);
self.inner.force_shutdown();
self.shutdown(true);
Response::reply(Ok(false))
}
}
}
trait WorkerHandler {
fn update_date(&self);
fn handle_connect(&mut self, Conn<net::TcpStream>);
fn force_shutdown(&self);
fn num_channels(&self) -> usize;
}
struct WorkerInner<H>
where
H: HttpHandler + 'static,
{
settings: Rc<WorkerSettings<H>>,
socks: Vec<Box<IoStreamHandler<H, net::TcpStream>>>,
tcp_ka: Option<time::Duration>,
}
impl<H: HttpHandler + 'static> WorkerInner<H> {
pub(crate) fn new(
h: Vec<H>, socks: Vec<Box<IoStreamHandler<H, net::TcpStream>>>,
keep_alive: KeepAlive, settings: ServerSettings, conn: Arc<AtomicUsize>,
sslrate: Arc<AtomicUsize>, notify: AcceptNotify,
) -> WorkerInner<H> {
let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive {
Some(time::Duration::new(val as u64, 0))
} else {
None
};
WorkerInner {
settings: Rc::new(WorkerSettings::new(
h, keep_alive, settings, notify, conn, sslrate,
)),
socks,
tcp_ka,
}
}
}
impl<H> WorkerHandler for WorkerInner<H>
where
H: HttpHandler + 'static,
{
fn update_date(&self) {
self.settings.update_date();
}
fn handle_connect(&mut self, msg: Conn<net::TcpStream>) {
if self.tcp_ka.is_some() && msg.io.set_keepalive(self.tcp_ka).is_err() {
error!("Can not set socket keep-alive option");
}
self.socks[msg.token.0].handle(Rc::clone(&self.settings), msg.io, msg.peer);
}
fn num_channels(&self) -> usize {
self.settings.num_channels()
}
fn force_shutdown(&self) {
self.settings.head().traverse::<TcpStream, H>();
}
}
struct SimpleHandler<Io> {
addr: net::SocketAddr,
io: PhantomData<Io>,
}
impl<Io: IntoAsyncIo> Clone for SimpleHandler<Io> {
fn clone(&self) -> Self {
SimpleHandler {
addr: self.addr,
io: PhantomData,
}
}
}
impl<Io: IntoAsyncIo> SimpleHandler<Io> {
fn new(addr: net::SocketAddr) -> Self {
SimpleHandler {
addr,
io: PhantomData,
}
}
}
impl<H, Io> IoStreamHandler<H, Io> for SimpleHandler<Io>
where
H: HttpHandler,
Io: IntoAsyncIo + Send + 'static,
Io::Io: IoStream,
{
fn addr(&self) -> net::SocketAddr {
self.addr
}
fn clone(&self) -> Box<IoStreamHandler<H, Io>> {
Box::new(Clone::clone(self))
}
fn scheme(&self) -> &'static str {
"http"
}
fn handle(&self, h: Rc<WorkerSettings<H>>, io: Io, peer: Option<net::SocketAddr>) {
let mut io = match io.into_async_io() {
Ok(io) => io,
Err(err) => {
trace!("Failed to create async io: {}", err);
return;
}
};
let _ = io.set_nodelay(true);
current_thread::spawn(HttpChannel::new(h, io, peer));
}
}
struct StreamHandler<A, Io> {
acceptor: A,
addr: net::SocketAddr,
io: PhantomData<Io>,
}
impl<Io: IntoAsyncIo, A: AcceptorService<Io::Io>> StreamHandler<A, Io> {
fn new(addr: net::SocketAddr, acceptor: A) -> Self {
StreamHandler {
addr,
acceptor,
io: PhantomData,
}
}
}
impl<Io: IntoAsyncIo, A: AcceptorService<Io::Io>> Clone for StreamHandler<A, Io> {
fn clone(&self) -> Self {
StreamHandler {
addr: self.addr,
acceptor: self.acceptor.clone(),
io: PhantomData,
}
}
}
impl<H, Io, A> IoStreamHandler<H, Io> for StreamHandler<A, Io>
where
H: HttpHandler,
Io: IntoAsyncIo + Send + 'static,
Io::Io: IoStream,
A: AcceptorService<Io::Io> + Send + 'static,
{
fn addr(&self) -> net::SocketAddr {
self.addr
}
fn clone(&self) -> Box<IoStreamHandler<H, Io>> {
Box::new(Clone::clone(self))
}
fn scheme(&self) -> &'static str {
self.acceptor.scheme()
}
fn handle(&self, h: Rc<WorkerSettings<H>>, io: Io, peer: Option<net::SocketAddr>) {
let mut io = match io.into_async_io() {
Ok(io) => io,
Err(err) => {
trace!("Failed to create async io: {}", err);
return;
}
};
let _ = io.set_nodelay(true);
h.conn_rate_add();
current_thread::spawn(self.acceptor.accept(io).then(move |res| {
h.conn_rate_del();
match res {
Ok(io) => current_thread::spawn(HttpChannel::new(h, io, peer)),
Err(err) => trace!("Can not establish connection: {}", err),
}
Ok(())
}))
}
}
impl<H, Io: 'static> IoStreamHandler<H, Io> for Box<IoStreamHandler<H, Io>>
where
H: HttpHandler,
Io: IntoAsyncIo,
{
fn addr(&self) -> net::SocketAddr {
self.as_ref().addr()
}
fn clone(&self) -> Box<IoStreamHandler<H, Io>> {
self.as_ref().clone()
}
fn scheme(&self) -> &'static str {
self.as_ref().scheme()
}
fn handle(&self, h: Rc<WorkerSettings<H>>, io: Io, peer: Option<net::SocketAddr>) {
self.as_ref().handle(h, io, peer)
}
}
pub(crate) trait IoStreamHandler<H, Io>: Send
where
H: HttpHandler,
{
fn clone(&self) -> Box<IoStreamHandler<H, Io>>;
fn addr(&self) -> net::SocketAddr;
fn scheme(&self) -> &'static str;
fn handle(&self, h: Rc<WorkerSettings<H>>, io: Io, peer: Option<net::SocketAddr>);
}
fn create_tcp_listener(
addr: net::SocketAddr, backlog: i32,
) -> io::Result<net::TcpListener> {
let builder = match addr {
net::SocketAddr::V4(_) => TcpBuilder::new_v4()?,
net::SocketAddr::V6(_) => TcpBuilder::new_v6()?,
};
builder.reuse_address(true)?;
builder.bind(addr)?;
Ok(builder.listen(backlog)?)
}

View File

@ -17,6 +17,8 @@ use tokio::runtime::current_thread::Runtime;
use openssl::ssl::SslAcceptorBuilder;
#[cfg(feature = "rust-tls")]
use rustls::ServerConfig;
#[cfg(feature = "alpn")]
use server::OpensslAcceptor;
#[cfg(feature = "rust-tls")]
use server::RustlsAcceptor;
@ -326,7 +328,7 @@ impl<S: 'static> TestServerBuilder<S> {
config(&mut app);
vec![app]
}).workers(1)
.disable_signals();
.disable_signals();
tx.send((System::current(), addr, TestServer::get_conn()))
.unwrap();
@ -336,7 +338,7 @@ impl<S: 'static> TestServerBuilder<S> {
let ssl = self.ssl.take();
if let Some(ssl) = ssl {
let tcp = net::TcpListener::bind(addr).unwrap();
srv = srv.listen_ssl(tcp, ssl).unwrap();
srv = srv.listen_with(tcp, OpensslAcceptor::new(ssl).unwrap());
}
}
#[cfg(feature = "rust-tls")]
@ -344,7 +346,7 @@ impl<S: 'static> TestServerBuilder<S> {
let ssl = self.rust_ssl.take();
if let Some(ssl) = ssl {
let tcp = net::TcpListener::bind(addr).unwrap();
srv = srv.listen_with(tcp, RustlsAcceptor::new(ssl)).unwrap();
srv = srv.listen_with(tcp, RustlsAcceptor::new(ssl));
}
}
if !has_ssl {
@ -722,8 +724,9 @@ impl<S: 'static> TestRequest<S> {
/// This method generates `HttpRequest` instance and executes handler
pub fn execute<F, R>(self, f: F) -> Result<HttpResponse, Error>
where F: FnOnce(&HttpRequest<S>) -> R,
R: Responder + 'static,
where
F: FnOnce(&HttpRequest<S>) -> R,
R: Responder + 'static,
{
let req = self.finish();
let resp = f(&req);