diff --git a/src/server/accept.rs b/src/server/accept.rs new file mode 100644 index 00000000..a91ca814 --- /dev/null +++ b/src/server/accept.rs @@ -0,0 +1,207 @@ +use std::sync::mpsc as sync_mpsc; +use std::time::Duration; +use std::{io, net, thread}; + +use futures::sync::mpsc; +use mio; +use slab::Slab; + +#[cfg(feature = "tls")] +use native_tls::TlsAcceptor; + +#[cfg(feature = "alpn")] +use openssl::ssl::{AlpnError, SslAcceptorBuilder}; + +#[cfg(feature = "rust-tls")] +use rustls::ServerConfig; + +use super::srv::{ServerCommand, Socket}; +use super::worker::{Conn, SocketInfo}; + +pub(crate) enum Command { + Pause, + Resume, + Stop, + Worker(usize, mpsc::UnboundedSender>), +} + +pub(crate) fn start_accept_thread( + token: usize, sock: Socket, srv: mpsc::UnboundedSender, + socks: Slab, + mut workers: Vec<(usize, mpsc::UnboundedSender>)>, +) -> (mio::SetReadiness, sync_mpsc::Sender) { + let (tx, rx) = sync_mpsc::channel(); + let (reg, readiness) = mio::Registration::new2(); + + // start accept thread + #[cfg_attr(feature = "cargo-clippy", allow(cyclomatic_complexity))] + let _ = thread::Builder::new() + .name(format!("Accept on {}", sock.addr)) + .spawn(move || { + const SRV: mio::Token = mio::Token(0); + const CMD: mio::Token = mio::Token(1); + + let addr = sock.addr; + let mut server = Some( + mio::net::TcpListener::from_std(sock.lst) + .expect("Can not create mio::net::TcpListener"), + ); + + // Create a poll instance + let poll = match mio::Poll::new() { + Ok(poll) => poll, + Err(err) => panic!("Can not create mio::Poll: {}", err), + }; + + // Start listening for incoming connections + if let Some(ref srv) = server { + if let Err(err) = + poll.register(srv, SRV, mio::Ready::readable(), mio::PollOpt::edge()) + { + panic!("Can not register io: {}", err); + } + } + + // Start listening for incoming commands + if let Err(err) = + poll.register(®, CMD, mio::Ready::readable(), mio::PollOpt::edge()) + { + panic!("Can not register Registration: {}", err); + } + + // Create storage for events + let mut events = mio::Events::with_capacity(128); + + // Sleep on error + let sleep = Duration::from_millis(100); + + let mut next = 0; + loop { + if let Err(err) = poll.poll(&mut events, None) { + panic!("Poll error: {}", err); + } + + for event in events.iter() { + match event.token() { + SRV => if let Some(ref server) = server { + loop { + match server.accept_std() { + Ok((io, addr)) => { + let mut msg = Conn { + io, + token, + peer: Some(addr), + http2: false, + }; + while !workers.is_empty() { + match workers[next].1.unbounded_send(msg) { + Ok(_) => (), + Err(err) => { + let _ = srv.unbounded_send( + ServerCommand::WorkerDied( + workers[next].0, + socks.clone(), + ), + ); + msg = err.into_inner(); + workers.swap_remove(next); + if workers.is_empty() { + error!("No workers"); + thread::sleep(sleep); + break; + } else if workers.len() <= next { + next = 0; + } + continue; + } + } + next = (next + 1) % workers.len(); + break; + } + } + Err(ref e) + if e.kind() == io::ErrorKind::WouldBlock => + { + break + } + Err(ref e) if connection_error(e) => continue, + Err(e) => { + error!("Error accepting connection: {}", e); + // sleep after error + thread::sleep(sleep); + break; + } + } + } + }, + CMD => match rx.try_recv() { + Ok(cmd) => match cmd { + Command::Pause => if let Some(ref server) = server { + if let Err(err) = poll.deregister(server) { + error!( + "Can not deregister server socket {}", + err + ); + } else { + info!( + "Paused accepting connections on {}", + addr + ); + } + }, + Command::Resume => { + if let Some(ref server) = server { + if let Err(err) = poll.register( + server, + SRV, + mio::Ready::readable(), + mio::PollOpt::edge(), + ) { + error!("Can not resume socket accept process: {}", err); + } else { + info!("Accepting connections on {} has been resumed", + addr); + } + } + } + Command::Stop => { + if let Some(server) = server.take() { + let _ = poll.deregister(&server); + } + return; + } + Command::Worker(idx, addr) => { + workers.push((idx, addr)); + } + }, + Err(err) => match err { + sync_mpsc::TryRecvError::Empty => (), + sync_mpsc::TryRecvError::Disconnected => { + if let Some(server) = server.take() { + let _ = poll.deregister(&server); + } + return; + } + }, + }, + _ => unreachable!(), + } + } + } + }); + + (readiness, tx) +} + +/// This function defines errors that are per-connection. Which basically +/// means that if we get this error from `accept()` system call it means +/// next connection might be ready to be accepted. +/// +/// All other errors will incur a timeout before next `accept()` is performed. +/// The timeout is useful to handle resource exhaustion errors like ENFILE +/// and EMFILE. Otherwise, could enter into tight loop. +fn connection_error(e: &io::Error) -> bool { + e.kind() == io::ErrorKind::ConnectionRefused + || e.kind() == io::ErrorKind::ConnectionAborted + || e.kind() == io::ErrorKind::ConnectionReset +} diff --git a/src/server/mod.rs b/src/server/mod.rs index dc8ecd81..a4f5e87d 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -7,6 +7,7 @@ use futures::{Async, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_tcp::TcpStream; +pub(crate) mod accept; mod channel; mod error; pub(crate) mod h1; diff --git a/src/server/srv.rs b/src/server/srv.rs index d6f5cf4d..a054d5a7 100644 --- a/src/server/srv.rs +++ b/src/server/srv.rs @@ -1,7 +1,7 @@ use std::rc::Rc; use std::sync::{mpsc as sync_mpsc, Arc}; use std::time::Duration; -use std::{io, net, thread}; +use std::{io, net}; use actix::{ fut, signal, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, Handler, @@ -25,6 +25,7 @@ use openssl::ssl::{AlpnError, SslAcceptorBuilder}; #[cfg(feature = "rust-tls")] use rustls::ServerConfig; +use super::accept::{start_accept_thread, Command}; use super::channel::{HttpChannel, WrapperStream}; use super::settings::{ServerSettings, WorkerSettings}; use super::worker::{Conn, SocketInfo, StopWorker, StreamHandlerType, Worker}; @@ -75,7 +76,7 @@ where no_signals: bool, } -enum ServerCommand { +pub(crate) enum ServerCommand { WorkerDied(usize, Slab), } @@ -86,10 +87,10 @@ where type Context = Context; } -struct Socket { - lst: net::TcpListener, - addr: net::SocketAddr, - tp: StreamHandlerType, +pub(crate) struct Socket { + pub lst: net::TcpListener, + pub addr: net::SocketAddr, + pub tp: StreamHandlerType, } impl HttpServer @@ -132,7 +133,10 @@ where } #[doc(hidden)] - #[deprecated(since = "0.6.0", note = "please use `HttpServer::workers()` instead")] + #[deprecated( + since = "0.6.0", + note = "please use `HttpServer::workers()` instead" + )] pub fn threads(self, num: usize) -> Self { self.workers(num) } @@ -538,7 +542,8 @@ impl HttpServer { #[doc(hidden)] #[cfg(feature = "tls")] #[deprecated( - since = "0.6.0", note = "please use `actix_web::HttpServer::bind_tls` instead" + since = "0.6.0", + note = "please use `actix_web::HttpServer::bind_tls` instead" )] impl HttpServer { /// Start listening for incoming tls connections. @@ -557,7 +562,8 @@ impl HttpServer { #[doc(hidden)] #[cfg(feature = "alpn")] #[deprecated( - since = "0.6.0", note = "please use `actix_web::HttpServer::bind_ssl` instead" + since = "0.6.0", + note = "please use `actix_web::HttpServer::bind_ssl` instead" )] impl HttpServer { /// Start listening for incoming tls connections. @@ -810,181 +816,6 @@ impl Handler for HttpServer { } } -enum Command { - Pause, - Resume, - Stop, - Worker(usize, mpsc::UnboundedSender>), -} - -fn start_accept_thread( - token: usize, sock: Socket, srv: mpsc::UnboundedSender, - socks: Slab, - mut workers: Vec<(usize, mpsc::UnboundedSender>)>, -) -> (mio::SetReadiness, sync_mpsc::Sender) { - let (tx, rx) = sync_mpsc::channel(); - let (reg, readiness) = mio::Registration::new2(); - - // start accept thread - #[cfg_attr(feature = "cargo-clippy", allow(cyclomatic_complexity))] - let _ = thread::Builder::new() - .name(format!("Accept on {}", sock.addr)) - .spawn(move || { - const SRV: mio::Token = mio::Token(0); - const CMD: mio::Token = mio::Token(1); - - let addr = sock.addr; - let mut server = Some( - mio::net::TcpListener::from_std(sock.lst) - .expect("Can not create mio::net::TcpListener"), - ); - - // Create a poll instance - let poll = match mio::Poll::new() { - Ok(poll) => poll, - Err(err) => panic!("Can not create mio::Poll: {}", err), - }; - - // Start listening for incoming connections - if let Some(ref srv) = server { - if let Err(err) = - poll.register(srv, SRV, mio::Ready::readable(), mio::PollOpt::edge()) - { - panic!("Can not register io: {}", err); - } - } - - // Start listening for incoming commands - if let Err(err) = - poll.register(®, CMD, mio::Ready::readable(), mio::PollOpt::edge()) - { - panic!("Can not register Registration: {}", err); - } - - // Create storage for events - let mut events = mio::Events::with_capacity(128); - - // Sleep on error - let sleep = Duration::from_millis(100); - - let mut next = 0; - loop { - if let Err(err) = poll.poll(&mut events, None) { - panic!("Poll error: {}", err); - } - - for event in events.iter() { - match event.token() { - SRV => if let Some(ref server) = server { - loop { - match server.accept_std() { - Ok((io, addr)) => { - let mut msg = Conn { - io, - token, - peer: Some(addr), - http2: false, - }; - while !workers.is_empty() { - match workers[next].1.unbounded_send(msg) { - Ok(_) => (), - Err(err) => { - let _ = srv.unbounded_send( - ServerCommand::WorkerDied( - workers[next].0, - socks.clone(), - ), - ); - msg = err.into_inner(); - workers.swap_remove(next); - if workers.is_empty() { - error!("No workers"); - thread::sleep(sleep); - break; - } else if workers.len() <= next { - next = 0; - } - continue; - } - } - next = (next + 1) % workers.len(); - break; - } - } - Err(ref e) - if e.kind() == io::ErrorKind::WouldBlock => - { - break - } - Err(ref e) if connection_error(e) => continue, - Err(e) => { - error!("Error accepting connection: {}", e); - // sleep after error - thread::sleep(sleep); - break; - } - } - } - }, - CMD => match rx.try_recv() { - Ok(cmd) => match cmd { - Command::Pause => if let Some(ref server) = server { - if let Err(err) = poll.deregister(server) { - error!( - "Can not deregister server socket {}", - err - ); - } else { - info!( - "Paused accepting connections on {}", - addr - ); - } - }, - Command::Resume => { - if let Some(ref server) = server { - if let Err(err) = poll.register( - server, - SRV, - mio::Ready::readable(), - mio::PollOpt::edge(), - ) { - error!("Can not resume socket accept process: {}", err); - } else { - info!("Accepting connections on {} has been resumed", - addr); - } - } - } - Command::Stop => { - if let Some(server) = server.take() { - let _ = poll.deregister(&server); - } - return; - } - Command::Worker(idx, addr) => { - workers.push((idx, addr)); - } - }, - Err(err) => match err { - sync_mpsc::TryRecvError::Empty => (), - sync_mpsc::TryRecvError::Disconnected => { - if let Some(server) = server.take() { - let _ = poll.deregister(&server); - } - return; - } - }, - }, - _ => unreachable!(), - } - } - } - }); - - (readiness, tx) -} - fn create_tcp_listener( addr: net::SocketAddr, backlog: i32, ) -> io::Result { @@ -996,16 +827,3 @@ fn create_tcp_listener( builder.bind(addr)?; Ok(builder.listen(backlog)?) } - -/// This function defines errors that are per-connection. Which basically -/// means that if we get this error from `accept()` system call it means -/// next connection might be ready to be accepted. -/// -/// All other errors will incur a timeout before next `accept()` is performed. -/// The timeout is useful to handle resource exhaustion errors like ENFILE -/// and EMFILE. Otherwise, could enter into tight loop. -fn connection_error(e: &io::Error) -> bool { - e.kind() == io::ErrorKind::ConnectionRefused - || e.kind() == io::ErrorKind::ConnectionAborted - || e.kind() == io::ErrorKind::ConnectionReset -}