1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-01-18 05:41:50 +01:00

switch to actix-net server

This commit is contained in:
Nikolay Kim 2018-09-07 23:34:27 -07:00
parent 52195bbf16
commit 1907102685
9 changed files with 341 additions and 1619 deletions

View File

@ -35,7 +35,7 @@ default = ["session", "brotli", "flate2-c"]
tls = ["native-tls", "tokio-tls"] tls = ["native-tls", "tokio-tls"]
# openssl # openssl
alpn = ["openssl", "tokio-openssl"] alpn = ["openssl", "tokio-openssl", "actix-net/ssl"]
# rustls # rustls
rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots"] rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots"]
@ -57,6 +57,7 @@ flate2-rust = ["flate2/rust_backend"]
[dependencies] [dependencies]
actix = "0.7.0" actix = "0.7.0"
actix-net = { git="https://github.com/actix/actix-net.git" }
base64 = "0.9" base64 = "0.9"
bitflags = "1.0" bitflags = "1.0"

View File

@ -140,6 +140,8 @@ extern crate serde_urlencoded;
extern crate percent_encoding; extern crate percent_encoding;
extern crate serde_json; extern crate serde_json;
extern crate smallvec; extern crate smallvec;
extern crate actix_net;
#[macro_use] #[macro_use]
extern crate actix as actix_inner; extern crate actix as actix_inner;

View File

@ -1,475 +0,0 @@
use std::sync::mpsc as sync_mpsc;
use std::time::{Duration, Instant};
use std::{io, net, thread};
use futures::{sync::mpsc, Future};
use mio;
use slab::Slab;
use tokio_timer::Delay;
use actix::{msgs::Execute, Arbiter, System};
use super::server::ServerCommand;
use super::worker::{Conn, WorkerClient};
use super::Token;
pub(crate) enum Command {
Pause,
Resume,
Stop,
Worker(WorkerClient),
}
struct ServerSocketInfo {
addr: net::SocketAddr,
token: Token,
handler: Token,
sock: mio::net::TcpListener,
timeout: Option<Instant>,
}
#[derive(Clone)]
pub(crate) struct AcceptNotify(mio::SetReadiness);
impl AcceptNotify {
pub(crate) fn new(ready: mio::SetReadiness) -> Self {
AcceptNotify(ready)
}
pub(crate) fn notify(&self) {
let _ = self.0.set_readiness(mio::Ready::readable());
}
}
impl Default for AcceptNotify {
fn default() -> Self {
AcceptNotify::new(mio::Registration::new2().1)
}
}
pub(crate) struct AcceptLoop {
cmd_reg: Option<mio::Registration>,
cmd_ready: mio::SetReadiness,
notify_reg: Option<mio::Registration>,
notify_ready: mio::SetReadiness,
tx: sync_mpsc::Sender<Command>,
rx: Option<sync_mpsc::Receiver<Command>>,
srv: Option<(
mpsc::UnboundedSender<ServerCommand>,
mpsc::UnboundedReceiver<ServerCommand>,
)>,
}
impl AcceptLoop {
pub fn new() -> AcceptLoop {
let (tx, rx) = sync_mpsc::channel();
let (cmd_reg, cmd_ready) = mio::Registration::new2();
let (notify_reg, notify_ready) = mio::Registration::new2();
AcceptLoop {
tx,
cmd_ready,
cmd_reg: Some(cmd_reg),
notify_ready,
notify_reg: Some(notify_reg),
rx: Some(rx),
srv: Some(mpsc::unbounded()),
}
}
pub fn send(&self, msg: Command) {
let _ = self.tx.send(msg);
let _ = self.cmd_ready.set_readiness(mio::Ready::readable());
}
pub fn get_notify(&self) -> AcceptNotify {
AcceptNotify::new(self.notify_ready.clone())
}
pub(crate) fn start(
&mut self, socks: Vec<Vec<(Token, net::TcpListener)>>,
workers: Vec<WorkerClient>,
) -> mpsc::UnboundedReceiver<ServerCommand> {
let (tx, rx) = self.srv.take().expect("Can not re-use AcceptInfo");
Accept::start(
self.rx.take().expect("Can not re-use AcceptInfo"),
self.cmd_reg.take().expect("Can not re-use AcceptInfo"),
self.notify_reg.take().expect("Can not re-use AcceptInfo"),
socks,
tx,
workers,
);
rx
}
}
struct Accept {
poll: mio::Poll,
rx: sync_mpsc::Receiver<Command>,
sockets: Slab<ServerSocketInfo>,
workers: Vec<WorkerClient>,
srv: mpsc::UnboundedSender<ServerCommand>,
timer: (mio::Registration, mio::SetReadiness),
next: usize,
backpressure: bool,
}
const DELTA: usize = 100;
const CMD: mio::Token = mio::Token(0);
const TIMER: mio::Token = mio::Token(1);
const NOTIFY: mio::Token = mio::Token(2);
/// This function defines errors that are per-connection. Which basically
/// means that if we get this error from `accept()` system call it means
/// next connection might be ready to be accepted.
///
/// All other errors will incur a timeout before next `accept()` is performed.
/// The timeout is useful to handle resource exhaustion errors like ENFILE
/// and EMFILE. Otherwise, could enter into tight loop.
fn connection_error(e: &io::Error) -> bool {
e.kind() == io::ErrorKind::ConnectionRefused
|| e.kind() == io::ErrorKind::ConnectionAborted
|| e.kind() == io::ErrorKind::ConnectionReset
}
impl Accept {
#![cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
pub(crate) fn start(
rx: sync_mpsc::Receiver<Command>, cmd_reg: mio::Registration,
notify_reg: mio::Registration, socks: Vec<Vec<(Token, net::TcpListener)>>,
srv: mpsc::UnboundedSender<ServerCommand>, workers: Vec<WorkerClient>,
) {
let sys = System::current();
// start accept thread
let _ = thread::Builder::new()
.name("actix-web accept loop".to_owned())
.spawn(move || {
System::set_current(sys);
let mut accept = Accept::new(rx, socks, workers, srv);
// Start listening for incoming commands
if let Err(err) = accept.poll.register(
&cmd_reg,
CMD,
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
panic!("Can not register Registration: {}", err);
}
// Start listening for notify updates
if let Err(err) = accept.poll.register(
&notify_reg,
NOTIFY,
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
panic!("Can not register Registration: {}", err);
}
accept.poll();
});
}
fn new(
rx: sync_mpsc::Receiver<Command>, socks: Vec<Vec<(Token, net::TcpListener)>>,
workers: Vec<WorkerClient>, srv: mpsc::UnboundedSender<ServerCommand>,
) -> Accept {
// Create a poll instance
let poll = match mio::Poll::new() {
Ok(poll) => poll,
Err(err) => panic!("Can not create mio::Poll: {}", err),
};
// Start accept
let mut sockets = Slab::new();
for (idx, srv_socks) in socks.into_iter().enumerate() {
for (hnd_token, lst) in srv_socks {
let addr = lst.local_addr().unwrap();
let server = mio::net::TcpListener::from_std(lst)
.expect("Can not create mio::net::TcpListener");
let entry = sockets.vacant_entry();
let token = entry.key();
// Start listening for incoming connections
if let Err(err) = poll.register(
&server,
mio::Token(token + DELTA),
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
panic!("Can not register io: {}", err);
}
entry.insert(ServerSocketInfo {
addr,
token: hnd_token,
handler: Token(idx),
sock: server,
timeout: None,
});
}
}
// Timer
let (tm, tmr) = mio::Registration::new2();
if let Err(err) =
poll.register(&tm, TIMER, mio::Ready::readable(), mio::PollOpt::edge())
{
panic!("Can not register Registration: {}", err);
}
Accept {
poll,
rx,
sockets,
workers,
srv,
next: 0,
timer: (tm, tmr),
backpressure: false,
}
}
fn poll(&mut self) {
// Create storage for events
let mut events = mio::Events::with_capacity(128);
loop {
if let Err(err) = self.poll.poll(&mut events, None) {
panic!("Poll error: {}", err);
}
for event in events.iter() {
let token = event.token();
match token {
CMD => if !self.process_cmd() {
return;
},
TIMER => self.process_timer(),
NOTIFY => self.backpressure(false),
_ => {
let token = usize::from(token);
if token < DELTA {
continue;
}
self.accept(token - DELTA);
}
}
}
}
}
fn process_timer(&mut self) {
let now = Instant::now();
for (token, info) in self.sockets.iter_mut() {
if let Some(inst) = info.timeout.take() {
if now > inst {
if let Err(err) = self.poll.register(
&info.sock,
mio::Token(token + DELTA),
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
error!("Can not register server socket {}", err);
} else {
info!("Resume accepting connections on {}", info.addr);
}
} else {
info.timeout = Some(inst);
}
}
}
}
fn process_cmd(&mut self) -> bool {
loop {
match self.rx.try_recv() {
Ok(cmd) => match cmd {
Command::Pause => {
for (_, info) in self.sockets.iter_mut() {
if let Err(err) = self.poll.deregister(&info.sock) {
error!("Can not deregister server socket {}", err);
} else {
info!("Paused accepting connections on {}", info.addr);
}
}
}
Command::Resume => {
for (token, info) in self.sockets.iter() {
if let Err(err) = self.poll.register(
&info.sock,
mio::Token(token + DELTA),
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
error!("Can not resume socket accept process: {}", err);
} else {
info!(
"Accepting connections on {} has been resumed",
info.addr
);
}
}
}
Command::Stop => {
for (_, info) in self.sockets.iter() {
let _ = self.poll.deregister(&info.sock);
}
return false;
}
Command::Worker(worker) => {
self.backpressure(false);
self.workers.push(worker);
}
},
Err(err) => match err {
sync_mpsc::TryRecvError::Empty => break,
sync_mpsc::TryRecvError::Disconnected => {
for (_, info) in self.sockets.iter() {
let _ = self.poll.deregister(&info.sock);
}
return false;
}
},
}
}
true
}
fn backpressure(&mut self, on: bool) {
if self.backpressure {
if !on {
self.backpressure = false;
for (token, info) in self.sockets.iter() {
if let Err(err) = self.poll.register(
&info.sock,
mio::Token(token + DELTA),
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
error!("Can not resume socket accept process: {}", err);
} else {
info!("Accepting connections on {} has been resumed", info.addr);
}
}
}
} else if on {
self.backpressure = true;
for (_, info) in self.sockets.iter() {
let _ = self.poll.deregister(&info.sock);
}
}
}
fn accept_one(&mut self, mut msg: Conn<net::TcpStream>) {
if self.backpressure {
while !self.workers.is_empty() {
match self.workers[self.next].send(msg) {
Ok(_) => (),
Err(err) => {
let _ = self.srv.unbounded_send(ServerCommand::WorkerDied(
self.workers[self.next].idx,
));
msg = err.into_inner();
self.workers.swap_remove(self.next);
if self.workers.is_empty() {
error!("No workers");
return;
} else if self.workers.len() <= self.next {
self.next = 0;
}
continue;
}
}
self.next = (self.next + 1) % self.workers.len();
break;
}
} else {
let mut idx = 0;
while idx < self.workers.len() {
idx += 1;
if self.workers[self.next].available() {
match self.workers[self.next].send(msg) {
Ok(_) => {
self.next = (self.next + 1) % self.workers.len();
return;
}
Err(err) => {
let _ = self.srv.unbounded_send(ServerCommand::WorkerDied(
self.workers[self.next].idx,
));
msg = err.into_inner();
self.workers.swap_remove(self.next);
if self.workers.is_empty() {
error!("No workers");
self.backpressure(true);
return;
} else if self.workers.len() <= self.next {
self.next = 0;
}
continue;
}
}
}
self.next = (self.next + 1) % self.workers.len();
}
// enable backpressure
self.backpressure(true);
self.accept_one(msg);
}
}
fn accept(&mut self, token: usize) {
loop {
let msg = if let Some(info) = self.sockets.get_mut(token) {
match info.sock.accept_std() {
Ok((io, addr)) => Conn {
io,
token: info.token,
handler: info.handler,
peer: Some(addr),
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
Err(ref e) if connection_error(e) => continue,
Err(e) => {
error!("Error accepting connection: {}", e);
if let Err(err) = self.poll.deregister(&info.sock) {
error!("Can not deregister server socket {}", err);
}
// sleep after error
info.timeout = Some(Instant::now() + Duration::from_millis(500));
let r = self.timer.1.clone();
System::current().arbiter().do_send(Execute::new(
move || -> Result<(), ()> {
Arbiter::spawn(
Delay::new(
Instant::now() + Duration::from_millis(510),
).map_err(|_| ())
.and_then(
move |_| {
let _ =
r.set_readiness(mio::Ready::readable());
Ok(())
},
),
);
Ok(())
},
));
return;
}
}
} else {
return;
};
self.accept_one(msg);
}
}
}

View File

@ -8,7 +8,7 @@ use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay; use tokio_timer::Delay;
use super::settings::WorkerSettings; use super::settings::WorkerSettings;
use super::{h1, h2, ConnectionTag, HttpHandler, IoStream}; use super::{h1, h2, HttpHandler, IoStream};
const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0"; const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0";
@ -32,7 +32,6 @@ where
proto: Option<HttpProtocol<T, H>>, proto: Option<HttpProtocol<T, H>>,
node: Option<Node<HttpChannel<T, H>>>, node: Option<Node<HttpChannel<T, H>>>,
ka_timeout: Option<Delay>, ka_timeout: Option<Delay>,
_tag: ConnectionTag,
} }
impl<T, H> HttpChannel<T, H> impl<T, H> HttpChannel<T, H>
@ -43,11 +42,9 @@ where
pub(crate) fn new( pub(crate) fn new(
settings: Rc<WorkerSettings<H>>, io: T, peer: Option<SocketAddr>, settings: Rc<WorkerSettings<H>>, io: T, peer: Option<SocketAddr>,
) -> HttpChannel<T, H> { ) -> HttpChannel<T, H> {
let _tag = settings.connection();
let ka_timeout = settings.keep_alive_timer(); let ka_timeout = settings.keep_alive_timer();
HttpChannel { HttpChannel {
_tag,
ka_timeout, ka_timeout,
node: None, node: None,
proto: Some(HttpProtocol::Unknown( proto: Some(HttpProtocol::Unknown(

View File

@ -5,29 +5,31 @@ use std::{io, mem, net, time};
use actix::{Actor, Addr, Arbiter, AsyncContext, Context, Handler, System}; use actix::{Actor, Addr, Arbiter, AsyncContext, Context, Handler, System};
use futures::{Future, Stream}; use futures::future::{ok, FutureResult};
use net2::{TcpBuilder, TcpStreamExt}; use futures::{Async, Poll, Stream};
use net2::TcpBuilder;
use num_cpus; use num_cpus;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_tcp::TcpStream;
#[cfg(feature = "tls")] use actix_net::{ssl, NewService, Service, Server};
use native_tls::TlsAcceptor;
//#[cfg(feature = "tls")]
//use native_tls::TlsAcceptor;
#[cfg(feature = "alpn")] #[cfg(feature = "alpn")]
use openssl::ssl::SslAcceptorBuilder; use openssl::ssl::SslAcceptorBuilder;
#[cfg(feature = "rust-tls")] //#[cfg(feature = "rust-tls")]
use rustls::ServerConfig; //use rustls::ServerConfig;
use super::channel::{HttpChannel, WrapperStream}; use super::channel::HttpChannel;
use super::server::{Connections, Server, Service, ServiceHandler};
use super::settings::{ServerSettings, WorkerSettings}; use super::settings::{ServerSettings, WorkerSettings};
use super::worker::{Conn, Socket}; use super::{HttpHandler, IntoHttpHandler, IoStream, KeepAlive};
use super::{
AcceptorService, HttpHandler, IntoAsyncIo, IntoHttpHandler, IoStream, KeepAlive, struct Socket<H: IntoHttpHandler> {
Token, lst: net::TcpListener,
}; addr: net::SocketAddr,
handler: Box<IoStreamHandler<H>>,
}
/// An HTTP Server /// An HTTP Server
/// ///
@ -49,8 +51,7 @@ where
no_signals: bool, no_signals: bool,
maxconn: usize, maxconn: usize,
maxconnrate: usize, maxconnrate: usize,
sockets: Vec<Socket>, sockets: Vec<Socket<H>>,
handlers: Vec<Box<IoStreamHandler<H::Handler, net::TcpStream>>>,
} }
impl<H> HttpServer<H> impl<H> HttpServer<H>
@ -75,11 +76,9 @@ where
exit: false, exit: false,
no_http2: false, no_http2: false,
no_signals: false, no_signals: false,
maxconn: 102_400, maxconn: 25_600,
maxconnrate: 256, maxconnrate: 256,
// settings: None,
sockets: Vec::new(), sockets: Vec::new(),
handlers: Vec::new(),
} }
} }
@ -112,7 +111,7 @@ where
/// All socket listeners will stop accepting connections when this limit is reached /// All socket listeners will stop accepting connections when this limit is reached
/// for each worker. /// for each worker.
/// ///
/// By default max connections is set to a 100k. /// By default max connections is set to a 25k.
pub fn maxconn(mut self, num: usize) -> Self { pub fn maxconn(mut self, num: usize) -> Self {
self.maxconn = num; self.maxconn = num;
self self
@ -196,9 +195,9 @@ where
/// and the user should be presented with an enumeration of which /// and the user should be presented with an enumeration of which
/// socket requires which protocol. /// socket requires which protocol.
pub fn addrs_with_scheme(&self) -> Vec<(net::SocketAddr, &str)> { pub fn addrs_with_scheme(&self) -> Vec<(net::SocketAddr, &str)> {
self.handlers self.sockets
.iter() .iter()
.map(|s| (s.addr(), s.scheme())) .map(|s| (s.addr, s.handler.scheme()))
.collect() .collect()
} }
@ -207,78 +206,82 @@ where
/// HttpServer does not change any configuration for TcpListener, /// HttpServer does not change any configuration for TcpListener,
/// it needs to be configured before passing it to listen() method. /// it needs to be configured before passing it to listen() method.
pub fn listen(mut self, lst: net::TcpListener) -> Self { pub fn listen(mut self, lst: net::TcpListener) -> Self {
let token = Token(self.handlers.len());
let addr = lst.local_addr().unwrap(); let addr = lst.local_addr().unwrap();
self.handlers self.sockets.push(Socket {
.push(Box::new(SimpleHandler::new(lst.local_addr().unwrap()))); lst,
self.sockets.push(Socket { lst, addr, token }); addr,
handler: Box::new(SimpleHandler {
addr,
factory: self.factory.clone(),
}),
});
self self
} }
#[doc(hidden)] // #[doc(hidden)]
/// Use listener for accepting incoming connection requests // /// Use listener for accepting incoming connection requests
pub fn listen_with<A>(mut self, lst: net::TcpListener, acceptor: A) -> Self // pub fn listen_with<A>(mut self, lst: net::TcpListener, acceptor: A) -> Self
where // where
A: AcceptorService<TcpStream> + Send + 'static, // A: AcceptorService<TcpStream> + Send + 'static,
{ // {
let token = Token(self.handlers.len()); // let token = Token(self.handlers.len());
let addr = lst.local_addr().unwrap(); // let addr = lst.local_addr().unwrap();
self.handlers.push(Box::new(StreamHandler::new( // self.handlers.push(Box::new(StreamHandler::new(
lst.local_addr().unwrap(), // lst.local_addr().unwrap(),
acceptor, // acceptor,
))); // )));
self.sockets.push(Socket { lst, addr, token }); // self.sockets.push(Socket { lst, addr, token });
self // self
} // }
#[cfg(feature = "tls")] // #[cfg(feature = "tls")]
/// Use listener for accepting incoming tls connection requests // /// Use listener for accepting incoming tls connection requests
/// // ///
/// HttpServer does not change any configuration for TcpListener, // /// HttpServer does not change any configuration for TcpListener,
/// it needs to be configured before passing it to listen() method. // /// it needs to be configured before passing it to listen() method.
pub fn listen_tls(self, lst: net::TcpListener, acceptor: TlsAcceptor) -> Self { // pub fn listen_tls(self, lst: net::TcpListener, acceptor: TlsAcceptor) -> Self {
use super::NativeTlsAcceptor; // use super::NativeTlsAcceptor;
//
// self.listen_with(lst, NativeTlsAcceptor::new(acceptor))
// }
self.listen_with(lst, NativeTlsAcceptor::new(acceptor)) // #[cfg(feature = "alpn")]
} // /// Use listener for accepting incoming tls connection requests
// ///
#[cfg(feature = "alpn")] // /// This method sets alpn protocols to "h2" and "http/1.1"
/// Use listener for accepting incoming tls connection requests // pub fn listen_ssl(
/// // self, lst: net::TcpListener, builder: SslAcceptorBuilder,
/// This method sets alpn protocols to "h2" and "http/1.1" // ) -> io::Result<Self> {
pub fn listen_ssl( // use super::{OpensslAcceptor, ServerFlags};
self, lst: net::TcpListener, builder: SslAcceptorBuilder,
) -> io::Result<Self> {
use super::{OpensslAcceptor, ServerFlags};
// alpn support // alpn support
let flags = if self.no_http2 { // let flags = if self.no_http2 {
ServerFlags::HTTP1 // ServerFlags::HTTP1
} else { // } else {
ServerFlags::HTTP1 | ServerFlags::HTTP2 // ServerFlags::HTTP1 | ServerFlags::HTTP2
}; // };
Ok(self.listen_with(lst, OpensslAcceptor::with_flags(builder, flags)?)) // Ok(self.listen_with(lst, OpensslAcceptor::with_flags(builder, flags)?))
} // }
#[cfg(feature = "rust-tls")] // #[cfg(feature = "rust-tls")]
/// Use listener for accepting incoming tls connection requests // /// Use listener for accepting incoming tls connection requests
/// // ///
/// This method sets alpn protocols to "h2" and "http/1.1" // /// This method sets alpn protocols to "h2" and "http/1.1"
pub fn listen_rustls(self, lst: net::TcpListener, builder: ServerConfig) -> Self { // pub fn listen_rustls(self, lst: net::TcpListener, builder: ServerConfig) -> Self {
use super::{RustlsAcceptor, ServerFlags}; // use super::{RustlsAcceptor, ServerFlags};
// alpn support // // alpn support
let flags = if self.no_http2 { // let flags = if self.no_http2 {
ServerFlags::HTTP1 // ServerFlags::HTTP1
} else { // } else {
ServerFlags::HTTP1 | ServerFlags::HTTP2 // ServerFlags::HTTP1 | ServerFlags::HTTP2
}; // };
//
self.listen_with(lst, RustlsAcceptor::with_flags(builder, flags)) // self.listen_with(lst, RustlsAcceptor::with_flags(builder, flags))
} // }
/// The socket address to bind /// The socket address to bind
/// ///
@ -287,38 +290,34 @@ where
let sockets = self.bind2(addr)?; let sockets = self.bind2(addr)?;
for lst in sockets { for lst in sockets {
let token = Token(self.handlers.len()); self = self.listen(lst);
let addr = lst.local_addr().unwrap();
self.handlers
.push(Box::new(SimpleHandler::new(lst.local_addr().unwrap())));
self.sockets.push(Socket { lst, addr, token })
} }
Ok(self) Ok(self)
} }
/// Start listening for incoming connections with supplied acceptor. // /// Start listening for incoming connections with supplied acceptor.
#[doc(hidden)] // #[doc(hidden)]
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] // #[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
pub fn bind_with<S, A>(mut self, addr: S, acceptor: A) -> io::Result<Self> // pub fn bind_with<S, A>(mut self, addr: S, acceptor: A) -> io::Result<Self>
where // where
S: net::ToSocketAddrs, // S: net::ToSocketAddrs,
A: AcceptorService<TcpStream> + Send + 'static, // A: AcceptorService<TcpStream> + Send + 'static,
{ // {
let sockets = self.bind2(addr)?; // let sockets = self.bind2(addr)?;
for lst in sockets { // for lst in sockets {
let token = Token(self.handlers.len()); // let token = Token(self.handlers.len());
let addr = lst.local_addr().unwrap(); // let addr = lst.local_addr().unwrap();
self.handlers.push(Box::new(StreamHandler::new( // self.handlers.push(Box::new(StreamHandler::new(
lst.local_addr().unwrap(), // lst.local_addr().unwrap(),
acceptor.clone(), // acceptor.clone(),
))); // )));
self.sockets.push(Socket { lst, addr, token }) // self.sockets.push(Socket { lst, addr, token })
} // }
Ok(self) // Ok(self)
} // }
fn bind2<S: net::ToSocketAddrs>( fn bind2<S: net::ToSocketAddrs>(
&self, addr: S, &self, addr: S,
@ -350,112 +349,109 @@ where
} }
} }
#[cfg(feature = "tls")] // #[cfg(feature = "tls")]
/// The ssl socket address to bind // /// The ssl socket address to bind
/// // ///
/// To bind multiple addresses this method can be called multiple times. // /// To bind multiple addresses this method can be called multiple times.
pub fn bind_tls<S: net::ToSocketAddrs>( // pub fn bind_tls<S: net::ToSocketAddrs>(
self, addr: S, acceptor: TlsAcceptor, // self, addr: S, acceptor: TlsAcceptor,
) -> io::Result<Self> { // ) -> io::Result<Self> {
use super::NativeTlsAcceptor; // use super::NativeTlsAcceptor;
self.bind_with(addr, NativeTlsAcceptor::new(acceptor)) // self.bind_with(addr, NativeTlsAcceptor::new(acceptor))
} // }
#[cfg(feature = "alpn")] // #[cfg(feature = "alpn")]
/// Start listening for incoming tls connections. // /// Start listening for incoming tls connections.
/// // ///
/// This method sets alpn protocols to "h2" and "http/1.1" // /// This method sets alpn protocols to "h2" and "http/1.1"
pub fn bind_ssl<S>(self, addr: S, builder: SslAcceptorBuilder) -> io::Result<Self> // pub fn bind_ssl<S>(self, addr: S, builder: SslAcceptorBuilder) -> io::Result<Self>
where // where
S: net::ToSocketAddrs, // S: net::ToSocketAddrs,
{ // {
use super::{OpensslAcceptor, ServerFlags}; // use super::{OpensslAcceptor, ServerFlags};
// alpn support // // alpn support
let flags = if !self.no_http2 { // let flags = if !self.no_http2 {
ServerFlags::HTTP1 // ServerFlags::HTTP1
} else { // } else {
ServerFlags::HTTP1 | ServerFlags::HTTP2 // ServerFlags::HTTP1 | ServerFlags::HTTP2
}; // };
self.bind_with(addr, OpensslAcceptor::with_flags(builder, flags)?) // self.bind_with(addr, OpensslAcceptor::with_flags(builder, flags)?)
} // }
#[cfg(feature = "rust-tls")] // #[cfg(feature = "rust-tls")]
/// Start listening for incoming tls connections. // /// Start listening for incoming tls connections.
/// // ///
/// This method sets alpn protocols to "h2" and "http/1.1" // /// This method sets alpn protocols to "h2" and "http/1.1"
pub fn bind_rustls<S: net::ToSocketAddrs>( // pub fn bind_rustls<S: net::ToSocketAddrs>(
self, addr: S, builder: ServerConfig, // self, addr: S, builder: ServerConfig,
) -> io::Result<Self> { // ) -> io::Result<Self> {
use super::{RustlsAcceptor, ServerFlags}; // use super::{RustlsAcceptor, ServerFlags};
// alpn support // // alpn support
let flags = if !self.no_http2 { // let flags = if !self.no_http2 {
ServerFlags::HTTP1 // ServerFlags::HTTP1
} else { // } else {
ServerFlags::HTTP1 | ServerFlags::HTTP2 // ServerFlags::HTTP1 | ServerFlags::HTTP2
}; // };
self.bind_with(addr, RustlsAcceptor::with_flags(builder, flags)) // self.bind_with(addr, RustlsAcceptor::with_flags(builder, flags))
} // }
} }
impl<H: IntoHttpHandler> Into<(Box<Service>, Vec<(Token, net::TcpListener)>)> struct HttpService<H, F, Io>
for HttpServer<H> where
H: HttpHandler,
F: IntoHttpHandler<Handler = H>,
Io: IoStream,
{ {
fn into(mut self) -> (Box<Service>, Vec<(Token, net::TcpListener)>) { factory: Arc<Fn() -> Vec<F> + Send + Sync>,
let sockets: Vec<_> = mem::replace(&mut self.sockets, Vec::new()) addr: net::SocketAddr,
.into_iter()
.map(|item| (item.token, item.lst))
.collect();
(
Box::new(HttpService {
factory: self.factory,
host: self.host,
keep_alive: self.keep_alive,
handlers: self.handlers,
}),
sockets,
)
}
}
struct HttpService<H: IntoHttpHandler> {
factory: Arc<Fn() -> Vec<H> + Send + Sync>,
host: Option<String>, host: Option<String>,
keep_alive: KeepAlive, keep_alive: KeepAlive,
handlers: Vec<Box<IoStreamHandler<H::Handler, net::TcpStream>>>, _t: PhantomData<(H, Io)>,
} }
impl<H: IntoHttpHandler + 'static> Service for HttpService<H> { impl<H, F, Io> NewService for HttpService<H, F, Io>
fn clone(&self) -> Box<Service> { where
Box::new(HttpService { H: HttpHandler,
factory: self.factory.clone(), F: IntoHttpHandler<Handler = H>,
host: self.host.clone(), Io: IoStream,
keep_alive: self.keep_alive, {
handlers: self.handlers.iter().map(|v| v.clone()).collect(), type Request = Io;
}) type Response = ();
} type Error = ();
type InitError = ();
type Service = HttpServiceHandler<H, Io>;
type Future = FutureResult<Self::Service, Self::Error>;
fn create(&self, conns: Connections) -> Box<ServiceHandler> { fn new_service(&self) -> Self::Future {
let addr = self.handlers[0].addr(); let s = ServerSettings::new(Some(self.addr), &self.host, false);
let s = ServerSettings::new(Some(addr), &self.host, false);
let apps: Vec<_> = (*self.factory)() let apps: Vec<_> = (*self.factory)()
.into_iter() .into_iter()
.map(|h| h.into_handler()) .map(|h| h.into_handler())
.collect(); .collect();
let handlers = self.handlers.iter().map(|h| h.clone()).collect();
Box::new(HttpServiceHandler::new( ok(HttpServiceHandler::new(apps, self.keep_alive, s))
apps, }
handlers, }
self.keep_alive,
s, impl<H, F, Io> Clone for HttpService<H, F, Io>
conns, where
)) H: HttpHandler,
F: IntoHttpHandler<Handler = H>,
Io: IoStream,
{
fn clone(&self) -> HttpService<H, F, Io> {
HttpService {
addr: self.addr,
factory: self.factory.clone(),
host: self.host.clone(),
keep_alive: self.keep_alive,
_t: PhantomData,
}
} }
} }
@ -485,11 +481,12 @@ impl<H: IntoHttpHandler> HttpServer<H> {
/// sys.run(); // <- Run actix system, this method starts all async processes /// sys.run(); // <- Run actix system, this method starts all async processes
/// } /// }
/// ``` /// ```
pub fn start(self) -> Addr<Server> { pub fn start(mut self) -> Addr<Server> {
ssl::max_concurrent_ssl_connect(self.maxconnrate);
let mut srv = Server::new() let mut srv = Server::new()
.workers(self.threads) .workers(self.threads)
.maxconn(self.maxconn) .maxconn(self.maxconn)
.maxconnrate(self.maxconnrate)
.shutdown_timeout(self.shutdown_timeout); .shutdown_timeout(self.shutdown_timeout);
srv = if self.exit { srv.system_exit() } else { srv }; srv = if self.exit { srv.system_exit() } else { srv };
@ -499,7 +496,17 @@ impl<H: IntoHttpHandler> HttpServer<H> {
srv srv
}; };
srv.service(self).start() let sockets = mem::replace(&mut self.sockets, Vec::new());
for socket in sockets {
let Socket {
lst,
addr: _,
handler,
} = socket;
srv = handler.register(srv, lst, self.host.clone(), self.keep_alive);
}
srv.start()
} }
/// Spawn new thread and start listening for incoming connections. /// Spawn new thread and start listening for incoming connections.
@ -529,275 +536,185 @@ impl<H: IntoHttpHandler> HttpServer<H> {
} }
} }
impl<H: IntoHttpHandler> HttpServer<H> { // impl<H: IntoHttpHandler> HttpServer<H> {
/// Start listening for incoming connections from a stream. // /// Start listening for incoming connections from a stream.
/// // ///
/// This method uses only one thread for handling incoming connections. // /// This method uses only one thread for handling incoming connections.
pub fn start_incoming<T, S>(self, stream: S, secure: bool) // pub fn start_incoming<T, S>(self, stream: S, secure: bool)
where // where
S: Stream<Item = T, Error = io::Error> + Send + 'static, // S: Stream<Item = T, Error = io::Error> + Send + 'static,
T: AsyncRead + AsyncWrite + Send + 'static, // T: AsyncRead + AsyncWrite + Send + 'static,
{ // {
// set server settings // // set server settings
let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap(); // let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap();
let srv_settings = ServerSettings::new(Some(addr), &self.host, secure); // let srv_settings = ServerSettings::new(Some(addr), &self.host, secure);
let apps: Vec<_> = (*self.factory)() // let apps: Vec<_> = (*self.factory)()
.into_iter() // .into_iter()
.map(|h| h.into_handler()) // .map(|h| h.into_handler())
.collect(); // .collect();
let settings = WorkerSettings::create( // let settings = WorkerSettings::create(
apps, // apps,
self.keep_alive, // self.keep_alive,
srv_settings, // srv_settings,
Connections::default(), // );
);
// start server // // start server
HttpIncoming::create(move |ctx| { // HttpIncoming::create(move |ctx| {
ctx.add_message_stream(stream.map_err(|_| ()).map(move |t| Conn { // ctx.add_message_stream(stream.map_err(|_| ()).map(move |t| Conn {
io: WrapperStream::new(t), // io: WrapperStream::new(t),
handler: Token::new(0), // handler: Token::new(0),
token: Token::new(0), // token: Token::new(0),
peer: None, // peer: None,
})); // }));
HttpIncoming { settings } // HttpIncoming { settings }
}); // });
} // }
} // }
struct HttpIncoming<H: HttpHandler> { // struct HttpIncoming<H: HttpHandler> {
settings: Rc<WorkerSettings<H>>, // settings: Rc<WorkerSettings<H>>,
} // }
impl<H> Actor for HttpIncoming<H> // impl<H> Actor for HttpIncoming<H>
// where
// H: HttpHandler,
// {
// type Context = Context<Self>;
// }
// impl<T, H> Handler<Conn<T>> for HttpIncoming<H>
// where
// T: IoStream,
// H: HttpHandler,
// {
// type Result = ();
// fn handle(&mut self, msg: Conn<T>, _: &mut Context<Self>) -> Self::Result {
// spawn(HttpChannel::new(
// Rc::clone(&self.settings),
// msg.io,
// msg.peer,
// ));
// }
// }
struct HttpServiceHandler<H, Io>
where where
H: HttpHandler, H: HttpHandler,
{ Io: IoStream,
type Context = Context<Self>;
}
impl<T, H> Handler<Conn<T>> for HttpIncoming<H>
where
T: IoStream,
H: HttpHandler,
{
type Result = ();
fn handle(&mut self, msg: Conn<T>, _: &mut Context<Self>) -> Self::Result {
Arbiter::spawn(HttpChannel::new(
Rc::clone(&self.settings),
msg.io,
msg.peer,
));
}
}
struct HttpServiceHandler<H>
where
H: HttpHandler + 'static,
{ {
settings: Rc<WorkerSettings<H>>, settings: Rc<WorkerSettings<H>>,
handlers: Vec<Box<IoStreamHandler<H, net::TcpStream>>>,
tcp_ka: Option<time::Duration>, tcp_ka: Option<time::Duration>,
_t: PhantomData<Io>,
} }
impl<H: HttpHandler + 'static> HttpServiceHandler<H> { impl<H, Io> HttpServiceHandler<H, Io>
where
H: HttpHandler,
Io: IoStream,
{
fn new( fn new(
apps: Vec<H>, handlers: Vec<Box<IoStreamHandler<H, net::TcpStream>>>, apps: Vec<H>, keep_alive: KeepAlive, settings: ServerSettings,
keep_alive: KeepAlive, settings: ServerSettings, conns: Connections, ) -> HttpServiceHandler<H, Io> {
) -> HttpServiceHandler<H> {
let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive { let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive {
Some(time::Duration::new(val as u64, 0)) Some(time::Duration::new(val as u64, 0))
} else { } else {
None None
}; };
let settings = WorkerSettings::create(apps, keep_alive, settings, conns); let settings = WorkerSettings::create(apps, keep_alive, settings);
HttpServiceHandler { HttpServiceHandler {
handlers,
tcp_ka, tcp_ka,
settings, settings,
_t: PhantomData,
} }
} }
} }
impl<H> ServiceHandler for HttpServiceHandler<H> impl<H, Io> Service for HttpServiceHandler<H, Io>
where where
H: HttpHandler + 'static, H: HttpHandler,
Io: IoStream,
{ {
fn handle( type Request = Io;
&mut self, token: Token, io: net::TcpStream, peer: Option<net::SocketAddr>, type Response = ();
) { type Error = ();
if self.tcp_ka.is_some() && io.set_keepalive(self.tcp_ka).is_err() { type Future = HttpChannel<Io, H>;
error!("Can not set socket keep-alive option");
} fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.handlers[token.0].handle(Rc::clone(&self.settings), io, peer); Ok(Async::Ready(()))
} }
fn shutdown(&self, force: bool) { fn call(&mut self, mut req: Self::Request) -> Self::Future {
if force { let _ = req.set_nodelay(true);
self.settings HttpChannel::new(Rc::clone(&self.settings), req, None)
.head()
.traverse(|ch: &mut HttpChannel<TcpStream, H>| ch.shutdown());
}
} }
// fn shutdown(&self, force: bool) {
// if force {
// self.settings.head().traverse::<TcpStream, H>();
// }
// }
} }
struct SimpleHandler<Io> { trait IoStreamHandler<H>: Send
addr: net::SocketAddr, where
io: PhantomData<Io>, H: IntoHttpHandler,
{
fn addr(&self) -> net::SocketAddr;
fn scheme(&self) -> &'static str;
fn register(
&self, server: Server, lst: net::TcpListener, host: Option<String>,
keep_alive: KeepAlive,
) -> Server;
} }
impl<Io: IntoAsyncIo> Clone for SimpleHandler<Io> { struct SimpleHandler<H>
where
H: IntoHttpHandler,
{
pub addr: net::SocketAddr,
pub factory: Arc<Fn() -> Vec<H> + Send + Sync>,
}
impl<H: IntoHttpHandler> Clone for SimpleHandler<H> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
SimpleHandler { SimpleHandler {
addr: self.addr, addr: self.addr,
io: PhantomData, factory: self.factory.clone(),
} }
} }
} }
impl<Io: IntoAsyncIo> SimpleHandler<Io> { impl<H> IoStreamHandler<H> for SimpleHandler<H>
fn new(addr: net::SocketAddr) -> Self {
SimpleHandler {
addr,
io: PhantomData,
}
}
}
impl<H, Io> IoStreamHandler<H, Io> for SimpleHandler<Io>
where where
H: HttpHandler, H: IntoHttpHandler + 'static,
Io: IntoAsyncIo + Send + 'static,
Io::Io: IoStream,
{ {
fn addr(&self) -> net::SocketAddr { fn addr(&self) -> net::SocketAddr {
self.addr self.addr
} }
fn clone(&self) -> Box<IoStreamHandler<H, Io>> {
Box::new(Clone::clone(self))
}
fn scheme(&self) -> &'static str { fn scheme(&self) -> &'static str {
"http" "http"
} }
fn handle(&self, h: Rc<WorkerSettings<H>>, io: Io, peer: Option<net::SocketAddr>) { fn register(
let mut io = match io.into_async_io() { &self, server: Server, lst: net::TcpListener, host: Option<String>,
Ok(io) => io, keep_alive: KeepAlive,
Err(err) => { ) -> Server {
trace!("Failed to create async io: {}", err); let addr = self.addr;
return; let factory = self.factory.clone();
}
};
let _ = io.set_nodelay(true);
Arbiter::spawn(HttpChannel::new(h, io, peer)); server.listen(lst, move || HttpService {
} keep_alive,
}
struct StreamHandler<A, Io> {
acceptor: A,
addr: net::SocketAddr,
io: PhantomData<Io>,
}
impl<Io: IntoAsyncIo, A: AcceptorService<Io::Io>> StreamHandler<A, Io> {
fn new(addr: net::SocketAddr, acceptor: A) -> Self {
StreamHandler {
addr, addr,
acceptor, host: host.clone(),
io: PhantomData, factory: factory.clone(),
_t: PhantomData,
})
} }
}
}
impl<Io: IntoAsyncIo, A: AcceptorService<Io::Io>> Clone for StreamHandler<A, Io> {
fn clone(&self) -> Self {
StreamHandler {
addr: self.addr,
acceptor: self.acceptor.clone(),
io: PhantomData,
}
}
}
impl<H, Io, A> IoStreamHandler<H, Io> for StreamHandler<A, Io>
where
H: HttpHandler,
Io: IntoAsyncIo + Send + 'static,
Io::Io: IoStream,
A: AcceptorService<Io::Io> + Send + 'static,
{
fn addr(&self) -> net::SocketAddr {
self.addr
}
fn clone(&self) -> Box<IoStreamHandler<H, Io>> {
Box::new(Clone::clone(self))
}
fn scheme(&self) -> &'static str {
self.acceptor.scheme()
}
fn handle(&self, h: Rc<WorkerSettings<H>>, io: Io, peer: Option<net::SocketAddr>) {
let mut io = match io.into_async_io() {
Ok(io) => io,
Err(err) => {
trace!("Failed to create async io: {}", err);
return;
}
};
let _ = io.set_nodelay(true);
let rate = h.connection_rate();
Arbiter::spawn(self.acceptor.accept(io).then(move |res| {
drop(rate);
match res {
Ok(io) => Arbiter::spawn(HttpChannel::new(h, io, peer)),
Err(err) => trace!("Can not establish connection: {}", err),
}
Ok(())
}))
}
}
impl<H, Io: 'static> IoStreamHandler<H, Io> for Box<IoStreamHandler<H, Io>>
where
H: HttpHandler,
Io: IntoAsyncIo,
{
fn addr(&self) -> net::SocketAddr {
self.as_ref().addr()
}
fn clone(&self) -> Box<IoStreamHandler<H, Io>> {
self.as_ref().clone()
}
fn scheme(&self) -> &'static str {
self.as_ref().scheme()
}
fn handle(&self, h: Rc<WorkerSettings<H>>, io: Io, peer: Option<net::SocketAddr>) {
self.as_ref().handle(h, io, peer)
}
}
trait IoStreamHandler<H, Io>: Send
where
H: HttpHandler,
{
fn clone(&self) -> Box<IoStreamHandler<H, Io>>;
fn addr(&self) -> net::SocketAddr;
fn scheme(&self) -> &'static str;
fn handle(&self, h: Rc<WorkerSettings<H>>, io: Io, peer: Option<net::SocketAddr>);
} }
fn create_tcp_listener( fn create_tcp_listener(

View File

@ -108,15 +108,13 @@
//! ``` //! ```
use std::net::Shutdown; use std::net::Shutdown;
use std::rc::Rc; use std::rc::Rc;
use std::{io, net, time}; use std::{io, time};
use bytes::{BufMut, BytesMut}; use bytes::{BufMut, BytesMut};
use futures::{Async, Future, Poll}; use futures::{Async, Poll};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_reactor::Handle;
use tokio_tcp::TcpStream; use tokio_tcp::TcpStream;
pub(crate) mod accept;
mod channel; mod channel;
mod error; mod error;
pub(crate) mod h1; pub(crate) mod h1;
@ -129,25 +127,15 @@ mod http;
pub(crate) mod input; pub(crate) mod input;
pub(crate) mod message; pub(crate) mod message;
pub(crate) mod output; pub(crate) mod output;
mod server;
pub(crate) mod settings; pub(crate) mod settings;
mod ssl; mod ssl;
mod worker;
use actix::Message; use actix::Message;
pub use self::message::Request;
pub use self::http::HttpServer; pub use self::http::HttpServer;
#[doc(hidden)] pub use self::message::Request;
pub use self::server::{
ConnectionRateTag, ConnectionTag, Connections, Server, Service, ServiceHandler,
};
pub use self::settings::ServerSettings; pub use self::settings::ServerSettings;
#[doc(hidden)]
pub use self::ssl::*;
#[doc(hidden)] #[doc(hidden)]
pub use self::helpers::write_content_length; pub use self::helpers::write_content_length;
@ -322,35 +310,6 @@ impl<T: HttpHandler> IntoHttpHandler for T {
} }
} }
pub(crate) trait IntoAsyncIo {
type Io: AsyncRead + AsyncWrite;
fn into_async_io(self) -> Result<Self::Io, io::Error>;
}
impl IntoAsyncIo for net::TcpStream {
type Io = TcpStream;
fn into_async_io(self) -> Result<Self::Io, io::Error> {
TcpStream::from_std(self, &Handle::default())
}
}
#[doc(hidden)]
/// Trait implemented by types that could accept incomming socket connections.
pub trait AcceptorService<Io: AsyncRead + AsyncWrite>: Clone {
/// Established connection type
type Accepted: IoStream;
/// Future describes async accept process.
type Future: Future<Item = Self::Accepted, Error = io::Error> + 'static;
/// Establish new connection
fn accept(&self, io: Io) -> Self::Future;
/// Scheme
fn scheme(&self) -> &'static str;
}
#[doc(hidden)] #[doc(hidden)]
#[derive(Debug)] #[derive(Debug)]
pub enum WriterState { pub enum WriterState {

View File

@ -1,528 +0,0 @@
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::time::Duration;
use std::{mem, net};
use futures::sync::{mpsc, mpsc::unbounded};
use futures::{Future, Sink, Stream};
use num_cpus;
use actix::{
fut, signal, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, Handler,
Response, StreamHandler, System, WrapFuture,
};
use super::accept::{AcceptLoop, AcceptNotify, Command};
use super::worker::{Conn, StopWorker, Worker, WorkerClient};
use super::{PauseServer, ResumeServer, StopServer, Token};
#[doc(hidden)]
/// Describes service that could be used
/// with [Server](struct.Server.html)
pub trait Service: Send + 'static {
/// Clone service
fn clone(&self) -> Box<Service>;
/// Create service handler for this service
fn create(&self, conn: Connections) -> Box<ServiceHandler>;
}
impl Service for Box<Service> {
fn clone(&self) -> Box<Service> {
self.as_ref().clone()
}
fn create(&self, conn: Connections) -> Box<ServiceHandler> {
self.as_ref().create(conn)
}
}
#[doc(hidden)]
/// Describes the way serivce handles incoming
/// TCP connections.
pub trait ServiceHandler {
/// Handle incoming stream
fn handle(
&mut self, token: Token, io: net::TcpStream, peer: Option<net::SocketAddr>,
);
/// Shutdown open handlers
fn shutdown(&self, _: bool) {}
}
pub(crate) enum ServerCommand {
WorkerDied(usize),
}
/// Generic server
#[doc(hidden)]
pub struct Server {
threads: usize,
workers: Vec<(usize, Addr<Worker>)>,
services: Vec<Box<Service>>,
sockets: Vec<Vec<(Token, net::TcpListener)>>,
accept: AcceptLoop,
exit: bool,
shutdown_timeout: u16,
signals: Option<Addr<signal::ProcessSignals>>,
no_signals: bool,
maxconn: usize,
maxconnrate: usize,
}
impl Default for Server {
fn default() -> Self {
Self::new()
}
}
impl Server {
/// Create new Server instance
pub fn new() -> Server {
Server {
threads: num_cpus::get(),
workers: Vec::new(),
services: Vec::new(),
sockets: Vec::new(),
accept: AcceptLoop::new(),
exit: false,
shutdown_timeout: 30,
signals: None,
no_signals: false,
maxconn: 102_400,
maxconnrate: 256,
}
}
/// Set number of workers to start.
///
/// By default http server uses number of available logical cpu as threads
/// count.
pub fn workers(mut self, num: usize) -> Self {
self.threads = num;
self
}
/// Sets the maximum per-worker number of concurrent connections.
///
/// All socket listeners will stop accepting connections when this limit is reached
/// for each worker.
///
/// By default max connections is set to a 100k.
pub fn maxconn(mut self, num: usize) -> Self {
self.maxconn = num;
self
}
/// Sets the maximum per-worker concurrent connection establish process.
///
/// All listeners will stop accepting connections when this limit is reached. It
/// can be used to limit the global SSL CPU usage.
///
/// By default max connections is set to a 256.
pub fn maxconnrate(mut self, num: usize) -> Self {
self.maxconnrate = num;
self
}
/// Stop actix system.
///
/// `SystemExit` message stops currently running system.
pub fn system_exit(mut self) -> Self {
self.exit = true;
self
}
#[doc(hidden)]
/// Set alternative address for `ProcessSignals` actor.
pub fn signals(mut self, addr: Addr<signal::ProcessSignals>) -> Self {
self.signals = Some(addr);
self
}
/// Disable signal handling
pub fn disable_signals(mut self) -> Self {
self.no_signals = true;
self
}
/// Timeout for graceful workers shutdown.
///
/// After receiving a stop signal, workers have this much time to finish
/// serving requests. Workers still alive after the timeout are force
/// dropped.
///
/// By default shutdown timeout sets to 30 seconds.
pub fn shutdown_timeout(mut self, sec: u16) -> Self {
self.shutdown_timeout = sec;
self
}
/// Add new service to server
pub fn service<T>(mut self, srv: T) -> Self
where
T: Into<(Box<Service>, Vec<(Token, net::TcpListener)>)>,
{
let (srv, sockets) = srv.into();
self.services.push(srv);
self.sockets.push(sockets);
self
}
/// Spawn new thread and start listening for incoming connections.
///
/// This method spawns new thread and starts new actix system. Other than
/// that it is similar to `start()` method. This method blocks.
///
/// This methods panics if no socket addresses get bound.
///
/// ```rust,ignore
/// # extern crate futures;
/// # extern crate actix_web;
/// # use futures::Future;
/// use actix_web::*;
///
/// fn main() {
/// Server::new().
/// .service(
/// HttpServer::new(|| App::new().resource("/", |r| r.h(|_| HttpResponse::Ok())))
/// .bind("127.0.0.1:0")
/// .expect("Can not bind to 127.0.0.1:0"))
/// .run();
/// }
/// ```
pub fn run(self) {
let sys = System::new("http-server");
self.start();
sys.run();
}
/// Starts Server Actor and returns its address
pub fn start(mut self) -> Addr<Server> {
if self.sockets.is_empty() {
panic!("Service should have at least one bound socket");
} else {
info!("Starting {} http workers", self.threads);
// start workers
let mut workers = Vec::new();
for idx in 0..self.threads {
let (addr, worker) = self.start_worker(idx, self.accept.get_notify());
workers.push(worker);
self.workers.push((idx, addr));
}
// start accept thread
for sock in &self.sockets {
for s in sock.iter() {
info!("Starting server on http://{}", s.1.local_addr().unwrap());
}
}
let rx = self
.accept
.start(mem::replace(&mut self.sockets, Vec::new()), workers);
// start http server actor
let signals = self.subscribe_to_signals();
let addr = Actor::create(move |ctx| {
ctx.add_stream(rx);
self
});
if let Some(signals) = signals {
signals.do_send(signal::Subscribe(addr.clone().recipient()))
}
addr
}
}
// subscribe to os signals
fn subscribe_to_signals(&self) -> Option<Addr<signal::ProcessSignals>> {
if !self.no_signals {
if let Some(ref signals) = self.signals {
Some(signals.clone())
} else {
Some(System::current().registry().get::<signal::ProcessSignals>())
}
} else {
None
}
}
fn start_worker(
&self, idx: usize, notify: AcceptNotify,
) -> (Addr<Worker>, WorkerClient) {
let (tx, rx) = unbounded::<Conn<net::TcpStream>>();
let conns = Connections::new(notify, self.maxconn, self.maxconnrate);
let worker = WorkerClient::new(idx, tx, conns.clone());
let services: Vec<_> = self.services.iter().map(|v| v.clone()).collect();
let addr = Arbiter::start(move |ctx: &mut Context<_>| {
ctx.add_message_stream(rx);
let handlers: Vec<_> = services
.into_iter()
.map(|s| s.create(conns.clone()))
.collect();
Worker::new(conns, handlers)
});
(addr, worker)
}
}
impl Actor for Server {
type Context = Context<Self>;
}
/// Signals support
/// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system
/// message to `System` actor.
impl Handler<signal::Signal> for Server {
type Result = ();
fn handle(&mut self, msg: signal::Signal, ctx: &mut Context<Self>) {
match msg.0 {
signal::SignalType::Int => {
info!("SIGINT received, exiting");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer { graceful: false }, ctx);
}
signal::SignalType::Term => {
info!("SIGTERM received, stopping");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer { graceful: true }, ctx);
}
signal::SignalType::Quit => {
info!("SIGQUIT received, exiting");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer { graceful: false }, ctx);
}
_ => (),
}
}
}
impl Handler<PauseServer> for Server {
type Result = ();
fn handle(&mut self, _: PauseServer, _: &mut Context<Self>) {
self.accept.send(Command::Pause);
}
}
impl Handler<ResumeServer> for Server {
type Result = ();
fn handle(&mut self, _: ResumeServer, _: &mut Context<Self>) {
self.accept.send(Command::Resume);
}
}
impl Handler<StopServer> for Server {
type Result = Response<(), ()>;
fn handle(&mut self, msg: StopServer, ctx: &mut Context<Self>) -> Self::Result {
// stop accept thread
self.accept.send(Command::Stop);
// stop workers
let (tx, rx) = mpsc::channel(1);
let dur = if msg.graceful {
Some(Duration::new(u64::from(self.shutdown_timeout), 0))
} else {
None
};
for worker in &self.workers {
let tx2 = tx.clone();
ctx.spawn(
worker
.1
.send(StopWorker { graceful: dur })
.into_actor(self)
.then(move |_, slf, ctx| {
slf.workers.pop();
if slf.workers.is_empty() {
let _ = tx2.send(());
// we need to stop system if server was spawned
if slf.exit {
ctx.run_later(Duration::from_millis(300), |_, _| {
System::current().stop();
});
}
}
fut::ok(())
}),
);
}
if !self.workers.is_empty() {
Response::async(rx.into_future().map(|_| ()).map_err(|_| ()))
} else {
// we need to stop system if server was spawned
if self.exit {
ctx.run_later(Duration::from_millis(300), |_, _| {
System::current().stop();
});
}
Response::reply(Ok(()))
}
}
}
/// Commands from accept threads
impl StreamHandler<ServerCommand, ()> for Server {
fn finished(&mut self, _: &mut Context<Self>) {}
fn handle(&mut self, msg: ServerCommand, _: &mut Context<Self>) {
match msg {
ServerCommand::WorkerDied(idx) => {
let mut found = false;
for i in 0..self.workers.len() {
if self.workers[i].0 == idx {
self.workers.swap_remove(i);
found = true;
break;
}
}
if found {
error!("Worker has died {:?}, restarting", idx);
let mut new_idx = self.workers.len();
'found: loop {
for i in 0..self.workers.len() {
if self.workers[i].0 == new_idx {
new_idx += 1;
continue 'found;
}
}
break;
}
let (addr, worker) =
self.start_worker(new_idx, self.accept.get_notify());
self.workers.push((new_idx, addr));
self.accept.send(Command::Worker(worker));
}
}
}
}
}
#[derive(Clone, Default)]
///Contains information about connection.
pub struct Connections(Arc<ConnectionsInner>);
impl Connections {
fn new(notify: AcceptNotify, maxconn: usize, maxconnrate: usize) -> Self {
let maxconn_low = if maxconn > 10 { maxconn - 10 } else { 0 };
let maxconnrate_low = if maxconnrate > 10 {
maxconnrate - 10
} else {
0
};
Connections(Arc::new(ConnectionsInner {
notify,
maxconn,
maxconnrate,
maxconn_low,
maxconnrate_low,
conn: AtomicUsize::new(0),
connrate: AtomicUsize::new(0),
}))
}
pub(crate) fn available(&self) -> bool {
self.0.available()
}
pub(crate) fn num_connections(&self) -> usize {
self.0.conn.load(Ordering::Relaxed)
}
/// Report opened connection
pub fn connection(&self) -> ConnectionTag {
ConnectionTag::new(self.0.clone())
}
/// Report rate connection, rate is usually ssl handshake
pub fn connection_rate(&self) -> ConnectionRateTag {
ConnectionRateTag::new(self.0.clone())
}
}
#[derive(Default)]
struct ConnectionsInner {
notify: AcceptNotify,
conn: AtomicUsize,
connrate: AtomicUsize,
maxconn: usize,
maxconnrate: usize,
maxconn_low: usize,
maxconnrate_low: usize,
}
impl ConnectionsInner {
fn available(&self) -> bool {
if self.maxconnrate <= self.connrate.load(Ordering::Relaxed) {
false
} else {
self.maxconn > self.conn.load(Ordering::Relaxed)
}
}
fn notify_maxconn(&self, maxconn: usize) {
if maxconn > self.maxconn_low && maxconn <= self.maxconn {
self.notify.notify();
}
}
fn notify_maxconnrate(&self, connrate: usize) {
if connrate > self.maxconnrate_low && connrate <= self.maxconnrate {
self.notify.notify();
}
}
}
/// Type responsible for max connection stat.
///
/// Max connections stat get updated on drop.
pub struct ConnectionTag(Arc<ConnectionsInner>);
impl ConnectionTag {
fn new(inner: Arc<ConnectionsInner>) -> Self {
inner.conn.fetch_add(1, Ordering::Relaxed);
ConnectionTag(inner)
}
}
impl Drop for ConnectionTag {
fn drop(&mut self) {
let conn = self.0.conn.fetch_sub(1, Ordering::Relaxed);
self.0.notify_maxconn(conn);
}
}
/// Type responsible for max connection rate stat.
///
/// Max connections rate stat get updated on drop.
pub struct ConnectionRateTag(Arc<ConnectionsInner>);
impl ConnectionRateTag {
fn new(inner: Arc<ConnectionsInner>) -> Self {
inner.connrate.fetch_add(1, Ordering::Relaxed);
ConnectionRateTag(inner)
}
}
impl Drop for ConnectionRateTag {
fn drop(&mut self) {
let connrate = self.0.connrate.fetch_sub(1, Ordering::Relaxed);
self.0.notify_maxconnrate(connrate);
}
}

View File

@ -17,7 +17,7 @@ use tokio_timer::{Delay, Interval};
use super::channel::Node; use super::channel::Node;
use super::message::{Request, RequestPool}; use super::message::{Request, RequestPool};
use super::server::{ConnectionRateTag, ConnectionTag, Connections}; // use super::server::{ConnectionRateTag, ConnectionTag, Connections};
use super::KeepAlive; use super::KeepAlive;
use body::Body; use body::Body;
use httpresponse::{HttpResponse, HttpResponseBuilder, HttpResponsePool}; use httpresponse::{HttpResponse, HttpResponseBuilder, HttpResponsePool};
@ -140,7 +140,6 @@ pub(crate) struct WorkerSettings<H> {
ka_enabled: bool, ka_enabled: bool,
bytes: Rc<SharedBytesPool>, bytes: Rc<SharedBytesPool>,
messages: &'static RequestPool, messages: &'static RequestPool,
conns: Connections,
node: RefCell<Node<()>>, node: RefCell<Node<()>>,
date: UnsafeCell<Date>, date: UnsafeCell<Date>,
} }
@ -148,9 +147,8 @@ pub(crate) struct WorkerSettings<H> {
impl<H: 'static> WorkerSettings<H> { impl<H: 'static> WorkerSettings<H> {
pub(crate) fn create( pub(crate) fn create(
apps: Vec<H>, keep_alive: KeepAlive, settings: ServerSettings, apps: Vec<H>, keep_alive: KeepAlive, settings: ServerSettings,
conns: Connections,
) -> Rc<WorkerSettings<H>> { ) -> Rc<WorkerSettings<H>> {
let settings = Rc::new(Self::new(apps, keep_alive, settings, conns)); let settings = Rc::new(Self::new(apps, keep_alive, settings));
// periodic date update // periodic date update
let s = settings.clone(); let s = settings.clone();
@ -169,7 +167,7 @@ impl<H: 'static> WorkerSettings<H> {
impl<H> WorkerSettings<H> { impl<H> WorkerSettings<H> {
pub(crate) fn new( pub(crate) fn new(
h: Vec<H>, keep_alive: KeepAlive, settings: ServerSettings, conns: Connections, h: Vec<H>, keep_alive: KeepAlive, settings: ServerSettings,
) -> WorkerSettings<H> { ) -> WorkerSettings<H> {
let (keep_alive, ka_enabled) = match keep_alive { let (keep_alive, ka_enabled) = match keep_alive {
KeepAlive::Timeout(val) => (val as u64, true), KeepAlive::Timeout(val) => (val as u64, true),
@ -185,7 +183,6 @@ impl<H> WorkerSettings<H> {
date: UnsafeCell::new(Date::new()), date: UnsafeCell::new(Date::new()),
keep_alive, keep_alive,
ka_enabled, ka_enabled,
conns,
} }
} }
@ -227,10 +224,6 @@ impl<H> WorkerSettings<H> {
RequestPool::get(self.messages) RequestPool::get(self.messages)
} }
pub fn connection(&self) -> ConnectionTag {
self.conns.connection()
}
fn update_date(&self) { fn update_date(&self) {
// Unsafe: WorkerSetting is !Sync and !Send // Unsafe: WorkerSetting is !Sync and !Send
unsafe { &mut *self.date.get() }.update(); unsafe { &mut *self.date.get() }.update();
@ -249,11 +242,6 @@ impl<H> WorkerSettings<H> {
dst.extend_from_slice(date_bytes); dst.extend_from_slice(date_bytes);
} }
} }
#[allow(dead_code)]
pub(crate) fn connection_rate(&self) -> ConnectionRateTag {
self.conns.connection_rate()
}
} }
struct Date { struct Date {

View File

@ -1,139 +0,0 @@
use std::{net, time};
use futures::sync::mpsc::{SendError, UnboundedSender};
use futures::sync::oneshot;
use futures::Future;
use actix::msgs::StopArbiter;
use actix::{Actor, Arbiter, AsyncContext, Context, Handler, Message, Response};
use super::server::{Connections, ServiceHandler};
use super::Token;
#[derive(Message)]
pub(crate) struct Conn<T> {
pub io: T,
pub handler: Token,
pub token: Token,
pub peer: Option<net::SocketAddr>,
}
pub(crate) struct Socket {
pub lst: net::TcpListener,
pub addr: net::SocketAddr,
pub token: Token,
}
#[derive(Clone)]
pub(crate) struct WorkerClient {
pub idx: usize,
tx: UnboundedSender<Conn<net::TcpStream>>,
conns: Connections,
}
impl WorkerClient {
pub fn new(
idx: usize, tx: UnboundedSender<Conn<net::TcpStream>>, conns: Connections,
) -> Self {
WorkerClient { idx, tx, conns }
}
pub fn send(
&self, msg: Conn<net::TcpStream>,
) -> Result<(), SendError<Conn<net::TcpStream>>> {
self.tx.unbounded_send(msg)
}
pub fn available(&self) -> bool {
self.conns.available()
}
}
/// Stop worker message. Returns `true` on successful shutdown
/// and `false` if some connections still alive.
pub(crate) struct StopWorker {
pub graceful: Option<time::Duration>,
}
impl Message for StopWorker {
type Result = Result<bool, ()>;
}
/// Http worker
///
/// Worker accepts Socket objects via unbounded channel and start requests
/// processing.
pub(crate) struct Worker {
conns: Connections,
handlers: Vec<Box<ServiceHandler>>,
}
impl Actor for Worker {
type Context = Context<Self>;
}
impl Worker {
pub(crate) fn new(conns: Connections, handlers: Vec<Box<ServiceHandler>>) -> Self {
Worker { conns, handlers }
}
fn shutdown(&self, force: bool) {
self.handlers.iter().for_each(|h| h.shutdown(force));
}
fn shutdown_timeout(
&self, ctx: &mut Context<Worker>, tx: oneshot::Sender<bool>, dur: time::Duration,
) {
// sleep for 1 second and then check again
ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| {
let num = slf.conns.num_connections();
if num == 0 {
let _ = tx.send(true);
Arbiter::current().do_send(StopArbiter(0));
} else if let Some(d) = dur.checked_sub(time::Duration::new(1, 0)) {
slf.shutdown_timeout(ctx, tx, d);
} else {
info!("Force shutdown http worker, {} connections", num);
slf.shutdown(true);
let _ = tx.send(false);
Arbiter::current().do_send(StopArbiter(0));
}
});
}
}
impl Handler<Conn<net::TcpStream>> for Worker {
type Result = ();
fn handle(&mut self, msg: Conn<net::TcpStream>, _: &mut Context<Self>) {
self.handlers[msg.handler.0].handle(msg.token, msg.io, msg.peer)
}
}
/// `StopWorker` message handler
impl Handler<StopWorker> for Worker {
type Result = Response<bool, ()>;
fn handle(&mut self, msg: StopWorker, ctx: &mut Context<Self>) -> Self::Result {
let num = self.conns.num_connections();
if num == 0 {
info!("Shutting down http worker, 0 connections");
Response::reply(Ok(true))
} else if let Some(dur) = msg.graceful {
self.shutdown(false);
let (tx, rx) = oneshot::channel();
let num = self.conns.num_connections();
if num != 0 {
info!("Graceful http worker shutdown, {} connections", num);
self.shutdown_timeout(ctx, tx, dur);
Response::reply(Ok(true))
} else {
Response::async(rx.map_err(|_| ()))
}
} else {
info!("Force shutdown http worker, {} connections", num);
self.shutdown(true);
Response::reply(Ok(false))
}
}
}