diff --git a/Cargo.toml b/Cargo.toml index 59a48a0e9..d4ea4fc1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,7 @@ default = ["session", "brotli", "flate2-c"] tls = ["native-tls", "tokio-tls"] # openssl -alpn = ["openssl", "tokio-openssl"] +alpn = ["openssl", "tokio-openssl", "actix-net/ssl"] # rustls rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots"] @@ -57,6 +57,7 @@ flate2-rust = ["flate2/rust_backend"] [dependencies] actix = "0.7.0" +actix-net = { git="https://github.com/actix/actix-net.git" } base64 = "0.9" bitflags = "1.0" diff --git a/src/lib.rs b/src/lib.rs index 2559f6460..1dfe143ef 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -140,6 +140,8 @@ extern crate serde_urlencoded; extern crate percent_encoding; extern crate serde_json; extern crate smallvec; + +extern crate actix_net; #[macro_use] extern crate actix as actix_inner; diff --git a/src/server/accept.rs b/src/server/accept.rs deleted file mode 100644 index 307a2a2f1..000000000 --- a/src/server/accept.rs +++ /dev/null @@ -1,475 +0,0 @@ -use std::sync::mpsc as sync_mpsc; -use std::time::{Duration, Instant}; -use std::{io, net, thread}; - -use futures::{sync::mpsc, Future}; -use mio; -use slab::Slab; -use tokio_timer::Delay; - -use actix::{msgs::Execute, Arbiter, System}; - -use super::server::ServerCommand; -use super::worker::{Conn, WorkerClient}; -use super::Token; - -pub(crate) enum Command { - Pause, - Resume, - Stop, - Worker(WorkerClient), -} - -struct ServerSocketInfo { - addr: net::SocketAddr, - token: Token, - handler: Token, - sock: mio::net::TcpListener, - timeout: Option, -} - -#[derive(Clone)] -pub(crate) struct AcceptNotify(mio::SetReadiness); - -impl AcceptNotify { - pub(crate) fn new(ready: mio::SetReadiness) -> Self { - AcceptNotify(ready) - } - - pub(crate) fn notify(&self) { - let _ = self.0.set_readiness(mio::Ready::readable()); - } -} - -impl Default for AcceptNotify { - fn default() -> Self { - AcceptNotify::new(mio::Registration::new2().1) - } -} - -pub(crate) struct AcceptLoop { - cmd_reg: Option, - cmd_ready: mio::SetReadiness, - notify_reg: Option, - notify_ready: mio::SetReadiness, - tx: sync_mpsc::Sender, - rx: Option>, - srv: Option<( - mpsc::UnboundedSender, - mpsc::UnboundedReceiver, - )>, -} - -impl AcceptLoop { - pub fn new() -> AcceptLoop { - let (tx, rx) = sync_mpsc::channel(); - let (cmd_reg, cmd_ready) = mio::Registration::new2(); - let (notify_reg, notify_ready) = mio::Registration::new2(); - - AcceptLoop { - tx, - cmd_ready, - cmd_reg: Some(cmd_reg), - notify_ready, - notify_reg: Some(notify_reg), - rx: Some(rx), - srv: Some(mpsc::unbounded()), - } - } - - pub fn send(&self, msg: Command) { - let _ = self.tx.send(msg); - let _ = self.cmd_ready.set_readiness(mio::Ready::readable()); - } - - pub fn get_notify(&self) -> AcceptNotify { - AcceptNotify::new(self.notify_ready.clone()) - } - - pub(crate) fn start( - &mut self, socks: Vec>, - workers: Vec, - ) -> mpsc::UnboundedReceiver { - let (tx, rx) = self.srv.take().expect("Can not re-use AcceptInfo"); - - Accept::start( - self.rx.take().expect("Can not re-use AcceptInfo"), - self.cmd_reg.take().expect("Can not re-use AcceptInfo"), - self.notify_reg.take().expect("Can not re-use AcceptInfo"), - socks, - tx, - workers, - ); - rx - } -} - -struct Accept { - poll: mio::Poll, - rx: sync_mpsc::Receiver, - sockets: Slab, - workers: Vec, - srv: mpsc::UnboundedSender, - timer: (mio::Registration, mio::SetReadiness), - next: usize, - backpressure: bool, -} - -const DELTA: usize = 100; -const CMD: mio::Token = mio::Token(0); -const TIMER: mio::Token = mio::Token(1); -const NOTIFY: mio::Token = mio::Token(2); - -/// This function defines errors that are per-connection. Which basically -/// means that if we get this error from `accept()` system call it means -/// next connection might be ready to be accepted. -/// -/// All other errors will incur a timeout before next `accept()` is performed. -/// The timeout is useful to handle resource exhaustion errors like ENFILE -/// and EMFILE. Otherwise, could enter into tight loop. -fn connection_error(e: &io::Error) -> bool { - e.kind() == io::ErrorKind::ConnectionRefused - || e.kind() == io::ErrorKind::ConnectionAborted - || e.kind() == io::ErrorKind::ConnectionReset -} - -impl Accept { - #![cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] - pub(crate) fn start( - rx: sync_mpsc::Receiver, cmd_reg: mio::Registration, - notify_reg: mio::Registration, socks: Vec>, - srv: mpsc::UnboundedSender, workers: Vec, - ) { - let sys = System::current(); - - // start accept thread - let _ = thread::Builder::new() - .name("actix-web accept loop".to_owned()) - .spawn(move || { - System::set_current(sys); - let mut accept = Accept::new(rx, socks, workers, srv); - - // Start listening for incoming commands - if let Err(err) = accept.poll.register( - &cmd_reg, - CMD, - mio::Ready::readable(), - mio::PollOpt::edge(), - ) { - panic!("Can not register Registration: {}", err); - } - - // Start listening for notify updates - if let Err(err) = accept.poll.register( - ¬ify_reg, - NOTIFY, - mio::Ready::readable(), - mio::PollOpt::edge(), - ) { - panic!("Can not register Registration: {}", err); - } - - accept.poll(); - }); - } - - fn new( - rx: sync_mpsc::Receiver, socks: Vec>, - workers: Vec, srv: mpsc::UnboundedSender, - ) -> Accept { - // Create a poll instance - let poll = match mio::Poll::new() { - Ok(poll) => poll, - Err(err) => panic!("Can not create mio::Poll: {}", err), - }; - - // Start accept - let mut sockets = Slab::new(); - for (idx, srv_socks) in socks.into_iter().enumerate() { - for (hnd_token, lst) in srv_socks { - let addr = lst.local_addr().unwrap(); - let server = mio::net::TcpListener::from_std(lst) - .expect("Can not create mio::net::TcpListener"); - - let entry = sockets.vacant_entry(); - let token = entry.key(); - - // Start listening for incoming connections - if let Err(err) = poll.register( - &server, - mio::Token(token + DELTA), - mio::Ready::readable(), - mio::PollOpt::edge(), - ) { - panic!("Can not register io: {}", err); - } - - entry.insert(ServerSocketInfo { - addr, - token: hnd_token, - handler: Token(idx), - sock: server, - timeout: None, - }); - } - } - - // Timer - let (tm, tmr) = mio::Registration::new2(); - if let Err(err) = - poll.register(&tm, TIMER, mio::Ready::readable(), mio::PollOpt::edge()) - { - panic!("Can not register Registration: {}", err); - } - - Accept { - poll, - rx, - sockets, - workers, - srv, - next: 0, - timer: (tm, tmr), - backpressure: false, - } - } - - fn poll(&mut self) { - // Create storage for events - let mut events = mio::Events::with_capacity(128); - - loop { - if let Err(err) = self.poll.poll(&mut events, None) { - panic!("Poll error: {}", err); - } - - for event in events.iter() { - let token = event.token(); - match token { - CMD => if !self.process_cmd() { - return; - }, - TIMER => self.process_timer(), - NOTIFY => self.backpressure(false), - _ => { - let token = usize::from(token); - if token < DELTA { - continue; - } - self.accept(token - DELTA); - } - } - } - } - } - - fn process_timer(&mut self) { - let now = Instant::now(); - for (token, info) in self.sockets.iter_mut() { - if let Some(inst) = info.timeout.take() { - if now > inst { - if let Err(err) = self.poll.register( - &info.sock, - mio::Token(token + DELTA), - mio::Ready::readable(), - mio::PollOpt::edge(), - ) { - error!("Can not register server socket {}", err); - } else { - info!("Resume accepting connections on {}", info.addr); - } - } else { - info.timeout = Some(inst); - } - } - } - } - - fn process_cmd(&mut self) -> bool { - loop { - match self.rx.try_recv() { - Ok(cmd) => match cmd { - Command::Pause => { - for (_, info) in self.sockets.iter_mut() { - if let Err(err) = self.poll.deregister(&info.sock) { - error!("Can not deregister server socket {}", err); - } else { - info!("Paused accepting connections on {}", info.addr); - } - } - } - Command::Resume => { - for (token, info) in self.sockets.iter() { - if let Err(err) = self.poll.register( - &info.sock, - mio::Token(token + DELTA), - mio::Ready::readable(), - mio::PollOpt::edge(), - ) { - error!("Can not resume socket accept process: {}", err); - } else { - info!( - "Accepting connections on {} has been resumed", - info.addr - ); - } - } - } - Command::Stop => { - for (_, info) in self.sockets.iter() { - let _ = self.poll.deregister(&info.sock); - } - return false; - } - Command::Worker(worker) => { - self.backpressure(false); - self.workers.push(worker); - } - }, - Err(err) => match err { - sync_mpsc::TryRecvError::Empty => break, - sync_mpsc::TryRecvError::Disconnected => { - for (_, info) in self.sockets.iter() { - let _ = self.poll.deregister(&info.sock); - } - return false; - } - }, - } - } - true - } - - fn backpressure(&mut self, on: bool) { - if self.backpressure { - if !on { - self.backpressure = false; - for (token, info) in self.sockets.iter() { - if let Err(err) = self.poll.register( - &info.sock, - mio::Token(token + DELTA), - mio::Ready::readable(), - mio::PollOpt::edge(), - ) { - error!("Can not resume socket accept process: {}", err); - } else { - info!("Accepting connections on {} has been resumed", info.addr); - } - } - } - } else if on { - self.backpressure = true; - for (_, info) in self.sockets.iter() { - let _ = self.poll.deregister(&info.sock); - } - } - } - - fn accept_one(&mut self, mut msg: Conn) { - if self.backpressure { - while !self.workers.is_empty() { - match self.workers[self.next].send(msg) { - Ok(_) => (), - Err(err) => { - let _ = self.srv.unbounded_send(ServerCommand::WorkerDied( - self.workers[self.next].idx, - )); - msg = err.into_inner(); - self.workers.swap_remove(self.next); - if self.workers.is_empty() { - error!("No workers"); - return; - } else if self.workers.len() <= self.next { - self.next = 0; - } - continue; - } - } - self.next = (self.next + 1) % self.workers.len(); - break; - } - } else { - let mut idx = 0; - while idx < self.workers.len() { - idx += 1; - if self.workers[self.next].available() { - match self.workers[self.next].send(msg) { - Ok(_) => { - self.next = (self.next + 1) % self.workers.len(); - return; - } - Err(err) => { - let _ = self.srv.unbounded_send(ServerCommand::WorkerDied( - self.workers[self.next].idx, - )); - msg = err.into_inner(); - self.workers.swap_remove(self.next); - if self.workers.is_empty() { - error!("No workers"); - self.backpressure(true); - return; - } else if self.workers.len() <= self.next { - self.next = 0; - } - continue; - } - } - } - self.next = (self.next + 1) % self.workers.len(); - } - // enable backpressure - self.backpressure(true); - self.accept_one(msg); - } - } - - fn accept(&mut self, token: usize) { - loop { - let msg = if let Some(info) = self.sockets.get_mut(token) { - match info.sock.accept_std() { - Ok((io, addr)) => Conn { - io, - token: info.token, - handler: info.handler, - peer: Some(addr), - }, - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return, - Err(ref e) if connection_error(e) => continue, - Err(e) => { - error!("Error accepting connection: {}", e); - if let Err(err) = self.poll.deregister(&info.sock) { - error!("Can not deregister server socket {}", err); - } - - // sleep after error - info.timeout = Some(Instant::now() + Duration::from_millis(500)); - - let r = self.timer.1.clone(); - System::current().arbiter().do_send(Execute::new( - move || -> Result<(), ()> { - Arbiter::spawn( - Delay::new( - Instant::now() + Duration::from_millis(510), - ).map_err(|_| ()) - .and_then( - move |_| { - let _ = - r.set_readiness(mio::Ready::readable()); - Ok(()) - }, - ), - ); - Ok(()) - }, - )); - return; - } - } - } else { - return; - }; - - self.accept_one(msg); - } - } -} diff --git a/src/server/channel.rs b/src/server/channel.rs index 3d753f655..d83e9a38e 100644 --- a/src/server/channel.rs +++ b/src/server/channel.rs @@ -8,7 +8,7 @@ use tokio_io::{AsyncRead, AsyncWrite}; use tokio_timer::Delay; use super::settings::WorkerSettings; -use super::{h1, h2, ConnectionTag, HttpHandler, IoStream}; +use super::{h1, h2, HttpHandler, IoStream}; const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0"; @@ -32,7 +32,6 @@ where proto: Option>, node: Option>>, ka_timeout: Option, - _tag: ConnectionTag, } impl HttpChannel @@ -43,11 +42,9 @@ where pub(crate) fn new( settings: Rc>, io: T, peer: Option, ) -> HttpChannel { - let _tag = settings.connection(); let ka_timeout = settings.keep_alive_timer(); HttpChannel { - _tag, ka_timeout, node: None, proto: Some(HttpProtocol::Unknown( diff --git a/src/server/http.rs b/src/server/http.rs index b6f577b02..5059b1326 100644 --- a/src/server/http.rs +++ b/src/server/http.rs @@ -5,29 +5,31 @@ use std::{io, mem, net, time}; use actix::{Actor, Addr, Arbiter, AsyncContext, Context, Handler, System}; -use futures::{Future, Stream}; -use net2::{TcpBuilder, TcpStreamExt}; +use futures::future::{ok, FutureResult}; +use futures::{Async, Poll, Stream}; +use net2::TcpBuilder; use num_cpus; -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_tcp::TcpStream; -#[cfg(feature = "tls")] -use native_tls::TlsAcceptor; +use actix_net::{ssl, NewService, Service, Server}; + +//#[cfg(feature = "tls")] +//use native_tls::TlsAcceptor; #[cfg(feature = "alpn")] use openssl::ssl::SslAcceptorBuilder; -#[cfg(feature = "rust-tls")] -use rustls::ServerConfig; +//#[cfg(feature = "rust-tls")] +//use rustls::ServerConfig; -use super::channel::{HttpChannel, WrapperStream}; -use super::server::{Connections, Server, Service, ServiceHandler}; +use super::channel::HttpChannel; use super::settings::{ServerSettings, WorkerSettings}; -use super::worker::{Conn, Socket}; -use super::{ - AcceptorService, HttpHandler, IntoAsyncIo, IntoHttpHandler, IoStream, KeepAlive, - Token, -}; +use super::{HttpHandler, IntoHttpHandler, IoStream, KeepAlive}; + +struct Socket { + lst: net::TcpListener, + addr: net::SocketAddr, + handler: Box>, +} /// An HTTP Server /// @@ -49,8 +51,7 @@ where no_signals: bool, maxconn: usize, maxconnrate: usize, - sockets: Vec, - handlers: Vec>>, + sockets: Vec>, } impl HttpServer @@ -75,11 +76,9 @@ where exit: false, no_http2: false, no_signals: false, - maxconn: 102_400, + maxconn: 25_600, maxconnrate: 256, - // settings: None, sockets: Vec::new(), - handlers: Vec::new(), } } @@ -112,7 +111,7 @@ where /// All socket listeners will stop accepting connections when this limit is reached /// for each worker. /// - /// By default max connections is set to a 100k. + /// By default max connections is set to a 25k. pub fn maxconn(mut self, num: usize) -> Self { self.maxconn = num; self @@ -196,9 +195,9 @@ where /// and the user should be presented with an enumeration of which /// socket requires which protocol. pub fn addrs_with_scheme(&self) -> Vec<(net::SocketAddr, &str)> { - self.handlers + self.sockets .iter() - .map(|s| (s.addr(), s.scheme())) + .map(|s| (s.addr, s.handler.scheme())) .collect() } @@ -207,78 +206,82 @@ where /// HttpServer does not change any configuration for TcpListener, /// it needs to be configured before passing it to listen() method. pub fn listen(mut self, lst: net::TcpListener) -> Self { - let token = Token(self.handlers.len()); let addr = lst.local_addr().unwrap(); - self.handlers - .push(Box::new(SimpleHandler::new(lst.local_addr().unwrap()))); - self.sockets.push(Socket { lst, addr, token }); + self.sockets.push(Socket { + lst, + addr, + handler: Box::new(SimpleHandler { + addr, + factory: self.factory.clone(), + }), + }); self } - #[doc(hidden)] - /// Use listener for accepting incoming connection requests - pub fn listen_with(mut self, lst: net::TcpListener, acceptor: A) -> Self - where - A: AcceptorService + Send + 'static, - { - let token = Token(self.handlers.len()); - let addr = lst.local_addr().unwrap(); - self.handlers.push(Box::new(StreamHandler::new( - lst.local_addr().unwrap(), - acceptor, - ))); - self.sockets.push(Socket { lst, addr, token }); + // #[doc(hidden)] + // /// Use listener for accepting incoming connection requests + // pub fn listen_with(mut self, lst: net::TcpListener, acceptor: A) -> Self + // where + // A: AcceptorService + Send + 'static, + // { + // let token = Token(self.handlers.len()); + // let addr = lst.local_addr().unwrap(); + // self.handlers.push(Box::new(StreamHandler::new( + // lst.local_addr().unwrap(), + // acceptor, + // ))); + // self.sockets.push(Socket { lst, addr, token }); - self - } + // self + // } - #[cfg(feature = "tls")] - /// Use listener for accepting incoming tls connection requests - /// - /// HttpServer does not change any configuration for TcpListener, - /// it needs to be configured before passing it to listen() method. - pub fn listen_tls(self, lst: net::TcpListener, acceptor: TlsAcceptor) -> Self { - use super::NativeTlsAcceptor; + // #[cfg(feature = "tls")] + // /// Use listener for accepting incoming tls connection requests + // /// + // /// HttpServer does not change any configuration for TcpListener, + // /// it needs to be configured before passing it to listen() method. + // pub fn listen_tls(self, lst: net::TcpListener, acceptor: TlsAcceptor) -> Self { + // use super::NativeTlsAcceptor; + // + // self.listen_with(lst, NativeTlsAcceptor::new(acceptor)) + // } - self.listen_with(lst, NativeTlsAcceptor::new(acceptor)) - } + // #[cfg(feature = "alpn")] + // /// Use listener for accepting incoming tls connection requests + // /// + // /// This method sets alpn protocols to "h2" and "http/1.1" + // pub fn listen_ssl( + // self, lst: net::TcpListener, builder: SslAcceptorBuilder, + // ) -> io::Result { + // use super::{OpensslAcceptor, ServerFlags}; - #[cfg(feature = "alpn")] - /// Use listener for accepting incoming tls connection requests - /// - /// This method sets alpn protocols to "h2" and "http/1.1" - pub fn listen_ssl( - self, lst: net::TcpListener, builder: SslAcceptorBuilder, - ) -> io::Result { - use super::{OpensslAcceptor, ServerFlags}; + // alpn support + // let flags = if self.no_http2 { + // ServerFlags::HTTP1 + // } else { + // ServerFlags::HTTP1 | ServerFlags::HTTP2 + // }; - // alpn support - let flags = if self.no_http2 { - ServerFlags::HTTP1 - } else { - ServerFlags::HTTP1 | ServerFlags::HTTP2 - }; + // Ok(self.listen_with(lst, OpensslAcceptor::with_flags(builder, flags)?)) + // } - Ok(self.listen_with(lst, OpensslAcceptor::with_flags(builder, flags)?)) - } + // #[cfg(feature = "rust-tls")] + // /// Use listener for accepting incoming tls connection requests + // /// + // /// This method sets alpn protocols to "h2" and "http/1.1" + // pub fn listen_rustls(self, lst: net::TcpListener, builder: ServerConfig) -> Self { + // use super::{RustlsAcceptor, ServerFlags}; - #[cfg(feature = "rust-tls")] - /// Use listener for accepting incoming tls connection requests - /// - /// This method sets alpn protocols to "h2" and "http/1.1" - pub fn listen_rustls(self, lst: net::TcpListener, builder: ServerConfig) -> Self { - use super::{RustlsAcceptor, ServerFlags}; - - // alpn support - let flags = if self.no_http2 { - ServerFlags::HTTP1 - } else { - ServerFlags::HTTP1 | ServerFlags::HTTP2 - }; - - self.listen_with(lst, RustlsAcceptor::with_flags(builder, flags)) - } + // // alpn support + // let flags = if self.no_http2 { + // ServerFlags::HTTP1 + // } else { + // ServerFlags::HTTP1 | ServerFlags::HTTP2 + // }; + // + // self.listen_with(lst, RustlsAcceptor::with_flags(builder, flags)) + // } /// The socket address to bind /// @@ -287,38 +290,34 @@ where let sockets = self.bind2(addr)?; for lst in sockets { - let token = Token(self.handlers.len()); - let addr = lst.local_addr().unwrap(); - self.handlers - .push(Box::new(SimpleHandler::new(lst.local_addr().unwrap()))); - self.sockets.push(Socket { lst, addr, token }) + self = self.listen(lst); } Ok(self) } - /// Start listening for incoming connections with supplied acceptor. - #[doc(hidden)] - #[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] - pub fn bind_with(mut self, addr: S, acceptor: A) -> io::Result - where - S: net::ToSocketAddrs, - A: AcceptorService + Send + 'static, - { - let sockets = self.bind2(addr)?; + // /// Start listening for incoming connections with supplied acceptor. + // #[doc(hidden)] + // #[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] + // pub fn bind_with(mut self, addr: S, acceptor: A) -> io::Result + // where + // S: net::ToSocketAddrs, + // A: AcceptorService + Send + 'static, + // { + // let sockets = self.bind2(addr)?; - for lst in sockets { - let token = Token(self.handlers.len()); - let addr = lst.local_addr().unwrap(); - self.handlers.push(Box::new(StreamHandler::new( - lst.local_addr().unwrap(), - acceptor.clone(), - ))); - self.sockets.push(Socket { lst, addr, token }) - } + // for lst in sockets { + // let token = Token(self.handlers.len()); + // let addr = lst.local_addr().unwrap(); + // self.handlers.push(Box::new(StreamHandler::new( + // lst.local_addr().unwrap(), + // acceptor.clone(), + // ))); + // self.sockets.push(Socket { lst, addr, token }) + // } - Ok(self) - } + // Ok(self) + // } fn bind2( &self, addr: S, @@ -350,112 +349,109 @@ where } } - #[cfg(feature = "tls")] - /// The ssl socket address to bind - /// - /// To bind multiple addresses this method can be called multiple times. - pub fn bind_tls( - self, addr: S, acceptor: TlsAcceptor, - ) -> io::Result { - use super::NativeTlsAcceptor; + // #[cfg(feature = "tls")] + // /// The ssl socket address to bind + // /// + // /// To bind multiple addresses this method can be called multiple times. + // pub fn bind_tls( + // self, addr: S, acceptor: TlsAcceptor, + // ) -> io::Result { + // use super::NativeTlsAcceptor; - self.bind_with(addr, NativeTlsAcceptor::new(acceptor)) - } + // self.bind_with(addr, NativeTlsAcceptor::new(acceptor)) + // } - #[cfg(feature = "alpn")] - /// Start listening for incoming tls connections. - /// - /// This method sets alpn protocols to "h2" and "http/1.1" - pub fn bind_ssl(self, addr: S, builder: SslAcceptorBuilder) -> io::Result - where - S: net::ToSocketAddrs, - { - use super::{OpensslAcceptor, ServerFlags}; + // #[cfg(feature = "alpn")] + // /// Start listening for incoming tls connections. + // /// + // /// This method sets alpn protocols to "h2" and "http/1.1" + // pub fn bind_ssl(self, addr: S, builder: SslAcceptorBuilder) -> io::Result + // where + // S: net::ToSocketAddrs, + // { + // use super::{OpensslAcceptor, ServerFlags}; - // alpn support - let flags = if !self.no_http2 { - ServerFlags::HTTP1 - } else { - ServerFlags::HTTP1 | ServerFlags::HTTP2 - }; + // // alpn support + // let flags = if !self.no_http2 { + // ServerFlags::HTTP1 + // } else { + // ServerFlags::HTTP1 | ServerFlags::HTTP2 + // }; - self.bind_with(addr, OpensslAcceptor::with_flags(builder, flags)?) - } + // self.bind_with(addr, OpensslAcceptor::with_flags(builder, flags)?) + // } - #[cfg(feature = "rust-tls")] - /// Start listening for incoming tls connections. - /// - /// This method sets alpn protocols to "h2" and "http/1.1" - pub fn bind_rustls( - self, addr: S, builder: ServerConfig, - ) -> io::Result { - use super::{RustlsAcceptor, ServerFlags}; + // #[cfg(feature = "rust-tls")] + // /// Start listening for incoming tls connections. + // /// + // /// This method sets alpn protocols to "h2" and "http/1.1" + // pub fn bind_rustls( + // self, addr: S, builder: ServerConfig, + // ) -> io::Result { + // use super::{RustlsAcceptor, ServerFlags}; - // alpn support - let flags = if !self.no_http2 { - ServerFlags::HTTP1 - } else { - ServerFlags::HTTP1 | ServerFlags::HTTP2 - }; + // // alpn support + // let flags = if !self.no_http2 { + // ServerFlags::HTTP1 + // } else { + // ServerFlags::HTTP1 | ServerFlags::HTTP2 + // }; - self.bind_with(addr, RustlsAcceptor::with_flags(builder, flags)) - } + // self.bind_with(addr, RustlsAcceptor::with_flags(builder, flags)) + // } } -impl Into<(Box, Vec<(Token, net::TcpListener)>)> - for HttpServer +struct HttpService +where + H: HttpHandler, + F: IntoHttpHandler, + Io: IoStream, { - fn into(mut self) -> (Box, Vec<(Token, net::TcpListener)>) { - let sockets: Vec<_> = mem::replace(&mut self.sockets, Vec::new()) - .into_iter() - .map(|item| (item.token, item.lst)) - .collect(); - - ( - Box::new(HttpService { - factory: self.factory, - host: self.host, - keep_alive: self.keep_alive, - handlers: self.handlers, - }), - sockets, - ) - } -} - -struct HttpService { - factory: Arc Vec + Send + Sync>, + factory: Arc Vec + Send + Sync>, + addr: net::SocketAddr, host: Option, keep_alive: KeepAlive, - handlers: Vec>>, + _t: PhantomData<(H, Io)>, } -impl Service for HttpService { - fn clone(&self) -> Box { - Box::new(HttpService { - factory: self.factory.clone(), - host: self.host.clone(), - keep_alive: self.keep_alive, - handlers: self.handlers.iter().map(|v| v.clone()).collect(), - }) - } +impl NewService for HttpService +where + H: HttpHandler, + F: IntoHttpHandler, + Io: IoStream, +{ + type Request = Io; + type Response = (); + type Error = (); + type InitError = (); + type Service = HttpServiceHandler; + type Future = FutureResult; - fn create(&self, conns: Connections) -> Box { - let addr = self.handlers[0].addr(); - let s = ServerSettings::new(Some(addr), &self.host, false); + fn new_service(&self) -> Self::Future { + let s = ServerSettings::new(Some(self.addr), &self.host, false); let apps: Vec<_> = (*self.factory)() .into_iter() .map(|h| h.into_handler()) .collect(); - let handlers = self.handlers.iter().map(|h| h.clone()).collect(); - Box::new(HttpServiceHandler::new( - apps, - handlers, - self.keep_alive, - s, - conns, - )) + ok(HttpServiceHandler::new(apps, self.keep_alive, s)) + } +} + +impl Clone for HttpService +where + H: HttpHandler, + F: IntoHttpHandler, + Io: IoStream, +{ + fn clone(&self) -> HttpService { + HttpService { + addr: self.addr, + factory: self.factory.clone(), + host: self.host.clone(), + keep_alive: self.keep_alive, + _t: PhantomData, + } } } @@ -485,11 +481,12 @@ impl HttpServer { /// sys.run(); // <- Run actix system, this method starts all async processes /// } /// ``` - pub fn start(self) -> Addr { + pub fn start(mut self) -> Addr { + ssl::max_concurrent_ssl_connect(self.maxconnrate); + let mut srv = Server::new() .workers(self.threads) .maxconn(self.maxconn) - .maxconnrate(self.maxconnrate) .shutdown_timeout(self.shutdown_timeout); srv = if self.exit { srv.system_exit() } else { srv }; @@ -499,7 +496,17 @@ impl HttpServer { srv }; - srv.service(self).start() + let sockets = mem::replace(&mut self.sockets, Vec::new()); + + for socket in sockets { + let Socket { + lst, + addr: _, + handler, + } = socket; + srv = handler.register(srv, lst, self.host.clone(), self.keep_alive); + } + srv.start() } /// Spawn new thread and start listening for incoming connections. @@ -529,277 +536,187 @@ impl HttpServer { } } -impl HttpServer { - /// Start listening for incoming connections from a stream. - /// - /// This method uses only one thread for handling incoming connections. - pub fn start_incoming(self, stream: S, secure: bool) - where - S: Stream + Send + 'static, - T: AsyncRead + AsyncWrite + Send + 'static, - { - // set server settings - let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap(); - let srv_settings = ServerSettings::new(Some(addr), &self.host, secure); - let apps: Vec<_> = (*self.factory)() - .into_iter() - .map(|h| h.into_handler()) - .collect(); - let settings = WorkerSettings::create( - apps, - self.keep_alive, - srv_settings, - Connections::default(), - ); +// impl HttpServer { +// /// Start listening for incoming connections from a stream. +// /// +// /// This method uses only one thread for handling incoming connections. +// pub fn start_incoming(self, stream: S, secure: bool) +// where +// S: Stream + Send + 'static, +// T: AsyncRead + AsyncWrite + Send + 'static, +// { +// // set server settings +// let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap(); +// let srv_settings = ServerSettings::new(Some(addr), &self.host, secure); +// let apps: Vec<_> = (*self.factory)() +// .into_iter() +// .map(|h| h.into_handler()) +// .collect(); +// let settings = WorkerSettings::create( +// apps, +// self.keep_alive, +// srv_settings, +// ); - // start server - HttpIncoming::create(move |ctx| { - ctx.add_message_stream(stream.map_err(|_| ()).map(move |t| Conn { - io: WrapperStream::new(t), - handler: Token::new(0), - token: Token::new(0), - peer: None, - })); - HttpIncoming { settings } - }); - } -} +// // start server +// HttpIncoming::create(move |ctx| { +// ctx.add_message_stream(stream.map_err(|_| ()).map(move |t| Conn { +// io: WrapperStream::new(t), +// handler: Token::new(0), +// token: Token::new(0), +// peer: None, +// })); +// HttpIncoming { settings } +// }); +// } +// } -struct HttpIncoming { - settings: Rc>, -} +// struct HttpIncoming { +// settings: Rc>, +// } -impl Actor for HttpIncoming +// impl Actor for HttpIncoming +// where +// H: HttpHandler, +// { +// type Context = Context; +// } + +// impl Handler> for HttpIncoming +// where +// T: IoStream, +// H: HttpHandler, +// { +// type Result = (); + +// fn handle(&mut self, msg: Conn, _: &mut Context) -> Self::Result { +// spawn(HttpChannel::new( +// Rc::clone(&self.settings), +// msg.io, +// msg.peer, +// )); +// } +// } + +struct HttpServiceHandler where H: HttpHandler, -{ - type Context = Context; -} - -impl Handler> for HttpIncoming -where - T: IoStream, - H: HttpHandler, -{ - type Result = (); - - fn handle(&mut self, msg: Conn, _: &mut Context) -> Self::Result { - Arbiter::spawn(HttpChannel::new( - Rc::clone(&self.settings), - msg.io, - msg.peer, - )); - } -} - -struct HttpServiceHandler -where - H: HttpHandler + 'static, + Io: IoStream, { settings: Rc>, - handlers: Vec>>, tcp_ka: Option, + _t: PhantomData, } -impl HttpServiceHandler { +impl HttpServiceHandler +where + H: HttpHandler, + Io: IoStream, +{ fn new( - apps: Vec, handlers: Vec>>, - keep_alive: KeepAlive, settings: ServerSettings, conns: Connections, - ) -> HttpServiceHandler { + apps: Vec, keep_alive: KeepAlive, settings: ServerSettings, + ) -> HttpServiceHandler { let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive { Some(time::Duration::new(val as u64, 0)) } else { None }; - let settings = WorkerSettings::create(apps, keep_alive, settings, conns); + let settings = WorkerSettings::create(apps, keep_alive, settings); HttpServiceHandler { - handlers, tcp_ka, settings, + _t: PhantomData, } } } -impl ServiceHandler for HttpServiceHandler +impl Service for HttpServiceHandler where - H: HttpHandler + 'static, + H: HttpHandler, + Io: IoStream, { - fn handle( - &mut self, token: Token, io: net::TcpStream, peer: Option, - ) { - if self.tcp_ka.is_some() && io.set_keepalive(self.tcp_ka).is_err() { - error!("Can not set socket keep-alive option"); - } - self.handlers[token.0].handle(Rc::clone(&self.settings), io, peer); + type Request = Io; + type Response = (); + type Error = (); + type Future = HttpChannel; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) } - fn shutdown(&self, force: bool) { - if force { - self.settings - .head() - .traverse(|ch: &mut HttpChannel| ch.shutdown()); - } + fn call(&mut self, mut req: Self::Request) -> Self::Future { + let _ = req.set_nodelay(true); + HttpChannel::new(Rc::clone(&self.settings), req, None) } + + // fn shutdown(&self, force: bool) { + // if force { + // self.settings.head().traverse::(); + // } + // } } -struct SimpleHandler { - addr: net::SocketAddr, - io: PhantomData, +trait IoStreamHandler: Send +where + H: IntoHttpHandler, +{ + fn addr(&self) -> net::SocketAddr; + + fn scheme(&self) -> &'static str; + + fn register( + &self, server: Server, lst: net::TcpListener, host: Option, + keep_alive: KeepAlive, + ) -> Server; } -impl Clone for SimpleHandler { +struct SimpleHandler +where + H: IntoHttpHandler, +{ + pub addr: net::SocketAddr, + pub factory: Arc Vec + Send + Sync>, +} + +impl Clone for SimpleHandler { fn clone(&self) -> Self { SimpleHandler { addr: self.addr, - io: PhantomData, + factory: self.factory.clone(), } } } -impl SimpleHandler { - fn new(addr: net::SocketAddr) -> Self { - SimpleHandler { - addr, - io: PhantomData, - } - } -} - -impl IoStreamHandler for SimpleHandler +impl IoStreamHandler for SimpleHandler where - H: HttpHandler, - Io: IntoAsyncIo + Send + 'static, - Io::Io: IoStream, + H: IntoHttpHandler + 'static, { fn addr(&self) -> net::SocketAddr { self.addr } - fn clone(&self) -> Box> { - Box::new(Clone::clone(self)) - } - fn scheme(&self) -> &'static str { "http" } - fn handle(&self, h: Rc>, io: Io, peer: Option) { - let mut io = match io.into_async_io() { - Ok(io) => io, - Err(err) => { - trace!("Failed to create async io: {}", err); - return; - } - }; - let _ = io.set_nodelay(true); + fn register( + &self, server: Server, lst: net::TcpListener, host: Option, + keep_alive: KeepAlive, + ) -> Server { + let addr = self.addr; + let factory = self.factory.clone(); - Arbiter::spawn(HttpChannel::new(h, io, peer)); - } -} - -struct StreamHandler { - acceptor: A, - addr: net::SocketAddr, - io: PhantomData, -} - -impl> StreamHandler { - fn new(addr: net::SocketAddr, acceptor: A) -> Self { - StreamHandler { + server.listen(lst, move || HttpService { + keep_alive, addr, - acceptor, - io: PhantomData, - } + host: host.clone(), + factory: factory.clone(), + _t: PhantomData, + }) } } -impl> Clone for StreamHandler { - fn clone(&self) -> Self { - StreamHandler { - addr: self.addr, - acceptor: self.acceptor.clone(), - io: PhantomData, - } - } -} - -impl IoStreamHandler for StreamHandler -where - H: HttpHandler, - Io: IntoAsyncIo + Send + 'static, - Io::Io: IoStream, - A: AcceptorService + Send + 'static, -{ - fn addr(&self) -> net::SocketAddr { - self.addr - } - - fn clone(&self) -> Box> { - Box::new(Clone::clone(self)) - } - - fn scheme(&self) -> &'static str { - self.acceptor.scheme() - } - - fn handle(&self, h: Rc>, io: Io, peer: Option) { - let mut io = match io.into_async_io() { - Ok(io) => io, - Err(err) => { - trace!("Failed to create async io: {}", err); - return; - } - }; - let _ = io.set_nodelay(true); - - let rate = h.connection_rate(); - Arbiter::spawn(self.acceptor.accept(io).then(move |res| { - drop(rate); - match res { - Ok(io) => Arbiter::spawn(HttpChannel::new(h, io, peer)), - Err(err) => trace!("Can not establish connection: {}", err), - } - Ok(()) - })) - } -} - -impl IoStreamHandler for Box> -where - H: HttpHandler, - Io: IntoAsyncIo, -{ - fn addr(&self) -> net::SocketAddr { - self.as_ref().addr() - } - - fn clone(&self) -> Box> { - self.as_ref().clone() - } - - fn scheme(&self) -> &'static str { - self.as_ref().scheme() - } - - fn handle(&self, h: Rc>, io: Io, peer: Option) { - self.as_ref().handle(h, io, peer) - } -} - -trait IoStreamHandler: Send -where - H: HttpHandler, -{ - fn clone(&self) -> Box>; - - fn addr(&self) -> net::SocketAddr; - - fn scheme(&self) -> &'static str; - - fn handle(&self, h: Rc>, io: Io, peer: Option); -} - fn create_tcp_listener( addr: net::SocketAddr, backlog: i32, ) -> io::Result { diff --git a/src/server/mod.rs b/src/server/mod.rs index 96ec570a1..25eca3a71 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -108,15 +108,13 @@ //! ``` use std::net::Shutdown; use std::rc::Rc; -use std::{io, net, time}; +use std::{io, time}; use bytes::{BufMut, BytesMut}; -use futures::{Async, Future, Poll}; +use futures::{Async, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_reactor::Handle; use tokio_tcp::TcpStream; -pub(crate) mod accept; mod channel; mod error; pub(crate) mod h1; @@ -129,25 +127,15 @@ mod http; pub(crate) mod input; pub(crate) mod message; pub(crate) mod output; -mod server; pub(crate) mod settings; mod ssl; -mod worker; use actix::Message; -pub use self::message::Request; - pub use self::http::HttpServer; -#[doc(hidden)] -pub use self::server::{ - ConnectionRateTag, ConnectionTag, Connections, Server, Service, ServiceHandler, -}; +pub use self::message::Request; pub use self::settings::ServerSettings; -#[doc(hidden)] -pub use self::ssl::*; - #[doc(hidden)] pub use self::helpers::write_content_length; @@ -322,35 +310,6 @@ impl IntoHttpHandler for T { } } -pub(crate) trait IntoAsyncIo { - type Io: AsyncRead + AsyncWrite; - - fn into_async_io(self) -> Result; -} - -impl IntoAsyncIo for net::TcpStream { - type Io = TcpStream; - - fn into_async_io(self) -> Result { - TcpStream::from_std(self, &Handle::default()) - } -} - -#[doc(hidden)] -/// Trait implemented by types that could accept incomming socket connections. -pub trait AcceptorService: Clone { - /// Established connection type - type Accepted: IoStream; - /// Future describes async accept process. - type Future: Future + 'static; - - /// Establish new connection - fn accept(&self, io: Io) -> Self::Future; - - /// Scheme - fn scheme(&self) -> &'static str; -} - #[doc(hidden)] #[derive(Debug)] pub enum WriterState { diff --git a/src/server/server.rs b/src/server/server.rs deleted file mode 100644 index 122571fd1..000000000 --- a/src/server/server.rs +++ /dev/null @@ -1,528 +0,0 @@ -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; -use std::time::Duration; -use std::{mem, net}; - -use futures::sync::{mpsc, mpsc::unbounded}; -use futures::{Future, Sink, Stream}; -use num_cpus; - -use actix::{ - fut, signal, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, Handler, - Response, StreamHandler, System, WrapFuture, -}; - -use super::accept::{AcceptLoop, AcceptNotify, Command}; -use super::worker::{Conn, StopWorker, Worker, WorkerClient}; -use super::{PauseServer, ResumeServer, StopServer, Token}; - -#[doc(hidden)] -/// Describes service that could be used -/// with [Server](struct.Server.html) -pub trait Service: Send + 'static { - /// Clone service - fn clone(&self) -> Box; - - /// Create service handler for this service - fn create(&self, conn: Connections) -> Box; -} - -impl Service for Box { - fn clone(&self) -> Box { - self.as_ref().clone() - } - - fn create(&self, conn: Connections) -> Box { - self.as_ref().create(conn) - } -} - -#[doc(hidden)] -/// Describes the way serivce handles incoming -/// TCP connections. -pub trait ServiceHandler { - /// Handle incoming stream - fn handle( - &mut self, token: Token, io: net::TcpStream, peer: Option, - ); - - /// Shutdown open handlers - fn shutdown(&self, _: bool) {} -} - -pub(crate) enum ServerCommand { - WorkerDied(usize), -} - -/// Generic server -#[doc(hidden)] -pub struct Server { - threads: usize, - workers: Vec<(usize, Addr)>, - services: Vec>, - sockets: Vec>, - accept: AcceptLoop, - exit: bool, - shutdown_timeout: u16, - signals: Option>, - no_signals: bool, - maxconn: usize, - maxconnrate: usize, -} - -impl Default for Server { - fn default() -> Self { - Self::new() - } -} - -impl Server { - /// Create new Server instance - pub fn new() -> Server { - Server { - threads: num_cpus::get(), - workers: Vec::new(), - services: Vec::new(), - sockets: Vec::new(), - accept: AcceptLoop::new(), - exit: false, - shutdown_timeout: 30, - signals: None, - no_signals: false, - maxconn: 102_400, - maxconnrate: 256, - } - } - - /// Set number of workers to start. - /// - /// By default http server uses number of available logical cpu as threads - /// count. - pub fn workers(mut self, num: usize) -> Self { - self.threads = num; - self - } - - /// Sets the maximum per-worker number of concurrent connections. - /// - /// All socket listeners will stop accepting connections when this limit is reached - /// for each worker. - /// - /// By default max connections is set to a 100k. - pub fn maxconn(mut self, num: usize) -> Self { - self.maxconn = num; - self - } - - /// Sets the maximum per-worker concurrent connection establish process. - /// - /// All listeners will stop accepting connections when this limit is reached. It - /// can be used to limit the global SSL CPU usage. - /// - /// By default max connections is set to a 256. - pub fn maxconnrate(mut self, num: usize) -> Self { - self.maxconnrate = num; - self - } - - /// Stop actix system. - /// - /// `SystemExit` message stops currently running system. - pub fn system_exit(mut self) -> Self { - self.exit = true; - self - } - - #[doc(hidden)] - /// Set alternative address for `ProcessSignals` actor. - pub fn signals(mut self, addr: Addr) -> Self { - self.signals = Some(addr); - self - } - - /// Disable signal handling - pub fn disable_signals(mut self) -> Self { - self.no_signals = true; - self - } - - /// Timeout for graceful workers shutdown. - /// - /// After receiving a stop signal, workers have this much time to finish - /// serving requests. Workers still alive after the timeout are force - /// dropped. - /// - /// By default shutdown timeout sets to 30 seconds. - pub fn shutdown_timeout(mut self, sec: u16) -> Self { - self.shutdown_timeout = sec; - self - } - - /// Add new service to server - pub fn service(mut self, srv: T) -> Self - where - T: Into<(Box, Vec<(Token, net::TcpListener)>)>, - { - let (srv, sockets) = srv.into(); - self.services.push(srv); - self.sockets.push(sockets); - self - } - - /// Spawn new thread and start listening for incoming connections. - /// - /// This method spawns new thread and starts new actix system. Other than - /// that it is similar to `start()` method. This method blocks. - /// - /// This methods panics if no socket addresses get bound. - /// - /// ```rust,ignore - /// # extern crate futures; - /// # extern crate actix_web; - /// # use futures::Future; - /// use actix_web::*; - /// - /// fn main() { - /// Server::new(). - /// .service( - /// HttpServer::new(|| App::new().resource("/", |r| r.h(|_| HttpResponse::Ok()))) - /// .bind("127.0.0.1:0") - /// .expect("Can not bind to 127.0.0.1:0")) - /// .run(); - /// } - /// ``` - pub fn run(self) { - let sys = System::new("http-server"); - self.start(); - sys.run(); - } - - /// Starts Server Actor and returns its address - pub fn start(mut self) -> Addr { - if self.sockets.is_empty() { - panic!("Service should have at least one bound socket"); - } else { - info!("Starting {} http workers", self.threads); - - // start workers - let mut workers = Vec::new(); - for idx in 0..self.threads { - let (addr, worker) = self.start_worker(idx, self.accept.get_notify()); - workers.push(worker); - self.workers.push((idx, addr)); - } - - // start accept thread - for sock in &self.sockets { - for s in sock.iter() { - info!("Starting server on http://{}", s.1.local_addr().unwrap()); - } - } - let rx = self - .accept - .start(mem::replace(&mut self.sockets, Vec::new()), workers); - - // start http server actor - let signals = self.subscribe_to_signals(); - let addr = Actor::create(move |ctx| { - ctx.add_stream(rx); - self - }); - if let Some(signals) = signals { - signals.do_send(signal::Subscribe(addr.clone().recipient())) - } - addr - } - } - - // subscribe to os signals - fn subscribe_to_signals(&self) -> Option> { - if !self.no_signals { - if let Some(ref signals) = self.signals { - Some(signals.clone()) - } else { - Some(System::current().registry().get::()) - } - } else { - None - } - } - - fn start_worker( - &self, idx: usize, notify: AcceptNotify, - ) -> (Addr, WorkerClient) { - let (tx, rx) = unbounded::>(); - let conns = Connections::new(notify, self.maxconn, self.maxconnrate); - let worker = WorkerClient::new(idx, tx, conns.clone()); - let services: Vec<_> = self.services.iter().map(|v| v.clone()).collect(); - - let addr = Arbiter::start(move |ctx: &mut Context<_>| { - ctx.add_message_stream(rx); - let handlers: Vec<_> = services - .into_iter() - .map(|s| s.create(conns.clone())) - .collect(); - Worker::new(conns, handlers) - }); - - (addr, worker) - } -} - -impl Actor for Server { - type Context = Context; -} - -/// Signals support -/// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system -/// message to `System` actor. -impl Handler for Server { - type Result = (); - - fn handle(&mut self, msg: signal::Signal, ctx: &mut Context) { - match msg.0 { - signal::SignalType::Int => { - info!("SIGINT received, exiting"); - self.exit = true; - Handler::::handle(self, StopServer { graceful: false }, ctx); - } - signal::SignalType::Term => { - info!("SIGTERM received, stopping"); - self.exit = true; - Handler::::handle(self, StopServer { graceful: true }, ctx); - } - signal::SignalType::Quit => { - info!("SIGQUIT received, exiting"); - self.exit = true; - Handler::::handle(self, StopServer { graceful: false }, ctx); - } - _ => (), - } - } -} - -impl Handler for Server { - type Result = (); - - fn handle(&mut self, _: PauseServer, _: &mut Context) { - self.accept.send(Command::Pause); - } -} - -impl Handler for Server { - type Result = (); - - fn handle(&mut self, _: ResumeServer, _: &mut Context) { - self.accept.send(Command::Resume); - } -} - -impl Handler for Server { - type Result = Response<(), ()>; - - fn handle(&mut self, msg: StopServer, ctx: &mut Context) -> Self::Result { - // stop accept thread - self.accept.send(Command::Stop); - - // stop workers - let (tx, rx) = mpsc::channel(1); - - let dur = if msg.graceful { - Some(Duration::new(u64::from(self.shutdown_timeout), 0)) - } else { - None - }; - for worker in &self.workers { - let tx2 = tx.clone(); - ctx.spawn( - worker - .1 - .send(StopWorker { graceful: dur }) - .into_actor(self) - .then(move |_, slf, ctx| { - slf.workers.pop(); - if slf.workers.is_empty() { - let _ = tx2.send(()); - - // we need to stop system if server was spawned - if slf.exit { - ctx.run_later(Duration::from_millis(300), |_, _| { - System::current().stop(); - }); - } - } - - fut::ok(()) - }), - ); - } - - if !self.workers.is_empty() { - Response::async(rx.into_future().map(|_| ()).map_err(|_| ())) - } else { - // we need to stop system if server was spawned - if self.exit { - ctx.run_later(Duration::from_millis(300), |_, _| { - System::current().stop(); - }); - } - Response::reply(Ok(())) - } - } -} - -/// Commands from accept threads -impl StreamHandler for Server { - fn finished(&mut self, _: &mut Context) {} - - fn handle(&mut self, msg: ServerCommand, _: &mut Context) { - match msg { - ServerCommand::WorkerDied(idx) => { - let mut found = false; - for i in 0..self.workers.len() { - if self.workers[i].0 == idx { - self.workers.swap_remove(i); - found = true; - break; - } - } - - if found { - error!("Worker has died {:?}, restarting", idx); - - let mut new_idx = self.workers.len(); - 'found: loop { - for i in 0..self.workers.len() { - if self.workers[i].0 == new_idx { - new_idx += 1; - continue 'found; - } - } - break; - } - - let (addr, worker) = - self.start_worker(new_idx, self.accept.get_notify()); - self.workers.push((new_idx, addr)); - self.accept.send(Command::Worker(worker)); - } - } - } - } -} - -#[derive(Clone, Default)] -///Contains information about connection. -pub struct Connections(Arc); - -impl Connections { - fn new(notify: AcceptNotify, maxconn: usize, maxconnrate: usize) -> Self { - let maxconn_low = if maxconn > 10 { maxconn - 10 } else { 0 }; - let maxconnrate_low = if maxconnrate > 10 { - maxconnrate - 10 - } else { - 0 - }; - - Connections(Arc::new(ConnectionsInner { - notify, - maxconn, - maxconnrate, - maxconn_low, - maxconnrate_low, - conn: AtomicUsize::new(0), - connrate: AtomicUsize::new(0), - })) - } - - pub(crate) fn available(&self) -> bool { - self.0.available() - } - - pub(crate) fn num_connections(&self) -> usize { - self.0.conn.load(Ordering::Relaxed) - } - - /// Report opened connection - pub fn connection(&self) -> ConnectionTag { - ConnectionTag::new(self.0.clone()) - } - - /// Report rate connection, rate is usually ssl handshake - pub fn connection_rate(&self) -> ConnectionRateTag { - ConnectionRateTag::new(self.0.clone()) - } -} - -#[derive(Default)] -struct ConnectionsInner { - notify: AcceptNotify, - conn: AtomicUsize, - connrate: AtomicUsize, - maxconn: usize, - maxconnrate: usize, - maxconn_low: usize, - maxconnrate_low: usize, -} - -impl ConnectionsInner { - fn available(&self) -> bool { - if self.maxconnrate <= self.connrate.load(Ordering::Relaxed) { - false - } else { - self.maxconn > self.conn.load(Ordering::Relaxed) - } - } - - fn notify_maxconn(&self, maxconn: usize) { - if maxconn > self.maxconn_low && maxconn <= self.maxconn { - self.notify.notify(); - } - } - - fn notify_maxconnrate(&self, connrate: usize) { - if connrate > self.maxconnrate_low && connrate <= self.maxconnrate { - self.notify.notify(); - } - } -} - -/// Type responsible for max connection stat. -/// -/// Max connections stat get updated on drop. -pub struct ConnectionTag(Arc); - -impl ConnectionTag { - fn new(inner: Arc) -> Self { - inner.conn.fetch_add(1, Ordering::Relaxed); - ConnectionTag(inner) - } -} - -impl Drop for ConnectionTag { - fn drop(&mut self) { - let conn = self.0.conn.fetch_sub(1, Ordering::Relaxed); - self.0.notify_maxconn(conn); - } -} - -/// Type responsible for max connection rate stat. -/// -/// Max connections rate stat get updated on drop. -pub struct ConnectionRateTag(Arc); - -impl ConnectionRateTag { - fn new(inner: Arc) -> Self { - inner.connrate.fetch_add(1, Ordering::Relaxed); - ConnectionRateTag(inner) - } -} - -impl Drop for ConnectionRateTag { - fn drop(&mut self) { - let connrate = self.0.connrate.fetch_sub(1, Ordering::Relaxed); - self.0.notify_maxconnrate(connrate); - } -} diff --git a/src/server/settings.rs b/src/server/settings.rs index fc0d931f0..2ca0b9b95 100644 --- a/src/server/settings.rs +++ b/src/server/settings.rs @@ -17,7 +17,7 @@ use tokio_timer::{Delay, Interval}; use super::channel::Node; use super::message::{Request, RequestPool}; -use super::server::{ConnectionRateTag, ConnectionTag, Connections}; +// use super::server::{ConnectionRateTag, ConnectionTag, Connections}; use super::KeepAlive; use body::Body; use httpresponse::{HttpResponse, HttpResponseBuilder, HttpResponsePool}; @@ -140,7 +140,6 @@ pub(crate) struct WorkerSettings { ka_enabled: bool, bytes: Rc, messages: &'static RequestPool, - conns: Connections, node: RefCell>, date: UnsafeCell, } @@ -148,9 +147,8 @@ pub(crate) struct WorkerSettings { impl WorkerSettings { pub(crate) fn create( apps: Vec, keep_alive: KeepAlive, settings: ServerSettings, - conns: Connections, ) -> Rc> { - let settings = Rc::new(Self::new(apps, keep_alive, settings, conns)); + let settings = Rc::new(Self::new(apps, keep_alive, settings)); // periodic date update let s = settings.clone(); @@ -169,7 +167,7 @@ impl WorkerSettings { impl WorkerSettings { pub(crate) fn new( - h: Vec, keep_alive: KeepAlive, settings: ServerSettings, conns: Connections, + h: Vec, keep_alive: KeepAlive, settings: ServerSettings, ) -> WorkerSettings { let (keep_alive, ka_enabled) = match keep_alive { KeepAlive::Timeout(val) => (val as u64, true), @@ -185,7 +183,6 @@ impl WorkerSettings { date: UnsafeCell::new(Date::new()), keep_alive, ka_enabled, - conns, } } @@ -227,10 +224,6 @@ impl WorkerSettings { RequestPool::get(self.messages) } - pub fn connection(&self) -> ConnectionTag { - self.conns.connection() - } - fn update_date(&self) { // Unsafe: WorkerSetting is !Sync and !Send unsafe { &mut *self.date.get() }.update(); @@ -249,11 +242,6 @@ impl WorkerSettings { dst.extend_from_slice(date_bytes); } } - - #[allow(dead_code)] - pub(crate) fn connection_rate(&self) -> ConnectionRateTag { - self.conns.connection_rate() - } } struct Date { diff --git a/src/server/worker.rs b/src/server/worker.rs deleted file mode 100644 index 77128adc0..000000000 --- a/src/server/worker.rs +++ /dev/null @@ -1,139 +0,0 @@ -use std::{net, time}; - -use futures::sync::mpsc::{SendError, UnboundedSender}; -use futures::sync::oneshot; -use futures::Future; - -use actix::msgs::StopArbiter; -use actix::{Actor, Arbiter, AsyncContext, Context, Handler, Message, Response}; - -use super::server::{Connections, ServiceHandler}; -use super::Token; - -#[derive(Message)] -pub(crate) struct Conn { - pub io: T, - pub handler: Token, - pub token: Token, - pub peer: Option, -} - -pub(crate) struct Socket { - pub lst: net::TcpListener, - pub addr: net::SocketAddr, - pub token: Token, -} - -#[derive(Clone)] -pub(crate) struct WorkerClient { - pub idx: usize, - tx: UnboundedSender>, - conns: Connections, -} - -impl WorkerClient { - pub fn new( - idx: usize, tx: UnboundedSender>, conns: Connections, - ) -> Self { - WorkerClient { idx, tx, conns } - } - - pub fn send( - &self, msg: Conn, - ) -> Result<(), SendError>> { - self.tx.unbounded_send(msg) - } - - pub fn available(&self) -> bool { - self.conns.available() - } -} - -/// Stop worker message. Returns `true` on successful shutdown -/// and `false` if some connections still alive. -pub(crate) struct StopWorker { - pub graceful: Option, -} - -impl Message for StopWorker { - type Result = Result; -} - -/// Http worker -/// -/// Worker accepts Socket objects via unbounded channel and start requests -/// processing. -pub(crate) struct Worker { - conns: Connections, - handlers: Vec>, -} - -impl Actor for Worker { - type Context = Context; -} - -impl Worker { - pub(crate) fn new(conns: Connections, handlers: Vec>) -> Self { - Worker { conns, handlers } - } - - fn shutdown(&self, force: bool) { - self.handlers.iter().for_each(|h| h.shutdown(force)); - } - - fn shutdown_timeout( - &self, ctx: &mut Context, tx: oneshot::Sender, dur: time::Duration, - ) { - // sleep for 1 second and then check again - ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| { - let num = slf.conns.num_connections(); - if num == 0 { - let _ = tx.send(true); - Arbiter::current().do_send(StopArbiter(0)); - } else if let Some(d) = dur.checked_sub(time::Duration::new(1, 0)) { - slf.shutdown_timeout(ctx, tx, d); - } else { - info!("Force shutdown http worker, {} connections", num); - slf.shutdown(true); - let _ = tx.send(false); - Arbiter::current().do_send(StopArbiter(0)); - } - }); - } -} - -impl Handler> for Worker { - type Result = (); - - fn handle(&mut self, msg: Conn, _: &mut Context) { - self.handlers[msg.handler.0].handle(msg.token, msg.io, msg.peer) - } -} - -/// `StopWorker` message handler -impl Handler for Worker { - type Result = Response; - - fn handle(&mut self, msg: StopWorker, ctx: &mut Context) -> Self::Result { - let num = self.conns.num_connections(); - if num == 0 { - info!("Shutting down http worker, 0 connections"); - Response::reply(Ok(true)) - } else if let Some(dur) = msg.graceful { - self.shutdown(false); - let (tx, rx) = oneshot::channel(); - let num = self.conns.num_connections(); - if num != 0 { - info!("Graceful http worker shutdown, {} connections", num); - self.shutdown_timeout(ctx, tx, dur); - Response::reply(Ok(true)) - } else { - Response::async(rx.map_err(|_| ())) - } - } else { - info!("Force shutdown http worker, {} connections", num); - self.shutdown(true); - Response::reply(Ok(false)) - } - } -}