use std::rc::Rc; use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc}; use std::{net, time}; use futures::sync::mpsc::{unbounded, SendError, UnboundedSender}; use futures::sync::oneshot; use futures::Future; use net2::TcpStreamExt; use slab::Slab; use tokio::executor::current_thread; use tokio_reactor::Handle; use tokio_tcp::TcpStream; #[cfg(any(feature = "tls", feature = "alpn", feature = "rust-tls"))] use futures::future; #[cfg(feature = "tls")] use native_tls::TlsAcceptor; #[cfg(feature = "tls")] use tokio_tls::TlsAcceptorExt; #[cfg(feature = "alpn")] use openssl::ssl::SslAcceptor; #[cfg(feature = "alpn")] use tokio_openssl::SslAcceptorExt; #[cfg(feature = "rust-tls")] use rustls::{ServerConfig, Session}; #[cfg(feature = "rust-tls")] use tokio_rustls::ServerConfigExt; use actix::msgs::StopArbiter; use actix::{Actor, Addr, Arbiter, AsyncContext, Context, Handler, Message, Response}; use super::accept::AcceptNotify; use super::channel::HttpChannel; use super::settings::{ServerSettings, WorkerSettings}; use super::{HttpHandler, IntoHttpHandler, KeepAlive}; #[derive(Message)] pub(crate) struct Conn { pub io: T, pub token: usize, pub peer: Option, pub http2: bool, } #[derive(Clone)] pub(crate) struct SocketInfo { pub addr: net::SocketAddr, pub htype: StreamHandlerType, } pub(crate) struct WorkersPool { sockets: Slab, pub factory: Arc Vec + Send + Sync>, pub host: Option, pub keep_alive: KeepAlive, } impl WorkersPool { pub fn new(factory: F) -> Self where F: Fn() -> Vec + Send + Sync + 'static, { WorkersPool { factory: Arc::new(factory), host: None, keep_alive: KeepAlive::Os, sockets: Slab::new(), } } pub fn insert(&mut self, addr: net::SocketAddr, htype: StreamHandlerType) -> usize { let entry = self.sockets.vacant_entry(); let token = entry.key(); entry.insert(SocketInfo { addr, htype }); token } pub fn start( &mut self, idx: usize, notify: AcceptNotify, ) -> (WorkerClient, Addr>) { let host = self.host.clone(); let addr = self.sockets[0].addr; let factory = Arc::clone(&self.factory); let socks = self.sockets.clone(); let ka = self.keep_alive; let (tx, rx) = unbounded::>(); let client = WorkerClient::new(idx, tx, self.sockets.clone()); let conn = client.conn.clone(); let sslrate = client.sslrate.clone(); let addr = Arbiter::start(move |ctx: &mut Context<_>| { let s = ServerSettings::new(Some(addr), &host, false); let apps: Vec<_> = (*factory)().into_iter().map(|h| h.into_handler()).collect(); ctx.add_message_stream(rx); Worker::new(apps, socks, ka, s, conn, sslrate, notify) }); (client, addr) } } #[derive(Clone)] pub(crate) struct WorkerClient { pub idx: usize, tx: UnboundedSender>, info: Slab, pub conn: Arc, pub sslrate: Arc, } impl WorkerClient { fn new( idx: usize, tx: UnboundedSender>, info: Slab, ) -> Self { WorkerClient { idx, tx, info, conn: Arc::new(AtomicUsize::new(0)), sslrate: Arc::new(AtomicUsize::new(0)), } } pub fn send( &self, msg: Conn, ) -> Result<(), SendError>> { self.tx.unbounded_send(msg) } pub fn available(&self, maxconn: usize, maxsslrate: usize) -> bool { if maxsslrate <= self.sslrate.load(Ordering::Relaxed) { false } else { maxconn > self.conn.load(Ordering::Relaxed) } } } /// Stop worker message. Returns `true` on successful shutdown /// and `false` if some connections still alive. pub(crate) struct StopWorker { pub graceful: Option, } impl Message for StopWorker { type Result = Result; } /// Http worker /// /// Worker accepts Socket objects via unbounded channel and start requests /// processing. pub(crate) struct Worker where H: HttpHandler + 'static, { settings: Rc>, socks: Slab, tcp_ka: Option, } impl Worker { pub(crate) fn new( h: Vec, socks: Slab, keep_alive: KeepAlive, settings: ServerSettings, conn: Arc, sslrate: Arc, notify: AcceptNotify, ) -> Worker { let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive { Some(time::Duration::new(val as u64, 0)) } else { None }; Worker { settings: Rc::new(WorkerSettings::new( h, keep_alive, settings, notify, conn, sslrate, )), socks, tcp_ka, } } fn update_time(&self, ctx: &mut Context) { self.settings.update_date(); ctx.run_later(time::Duration::new(1, 0), |slf, ctx| slf.update_time(ctx)); } fn shutdown_timeout( &self, ctx: &mut Context, tx: oneshot::Sender, dur: time::Duration, ) { // sleep for 1 second and then check again ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| { let num = slf.settings.num_channels(); if num == 0 { let _ = tx.send(true); Arbiter::current().do_send(StopArbiter(0)); } else if let Some(d) = dur.checked_sub(time::Duration::new(1, 0)) { slf.shutdown_timeout(ctx, tx, d); } else { info!("Force shutdown http worker, {} connections", num); slf.settings.head().traverse::(); let _ = tx.send(false); Arbiter::current().do_send(StopArbiter(0)); } }); } } impl Actor for Worker where H: HttpHandler + 'static, { type Context = Context; fn started(&mut self, ctx: &mut Self::Context) { self.update_time(ctx); } } impl Handler> for Worker where H: HttpHandler + 'static, { type Result = (); fn handle(&mut self, msg: Conn, _: &mut Context) { if self.tcp_ka.is_some() && msg.io.set_keepalive(self.tcp_ka).is_err() { error!("Can not set socket keep-alive option"); } self.socks .get_mut(msg.token) .unwrap() .htype .handle(Rc::clone(&self.settings), msg); } } /// `StopWorker` message handler impl Handler for Worker where H: HttpHandler + 'static, { type Result = Response; fn handle(&mut self, msg: StopWorker, ctx: &mut Context) -> Self::Result { let num = self.settings.num_channels(); if num == 0 { info!("Shutting down http worker, 0 connections"); Response::reply(Ok(true)) } else if let Some(dur) = msg.graceful { info!("Graceful http worker shutdown, {} connections", num); let (tx, rx) = oneshot::channel(); self.shutdown_timeout(ctx, tx, dur); Response::async(rx.map_err(|_| ())) } else { info!("Force shutdown http worker, {} connections", num); self.settings.head().traverse::(); Response::reply(Ok(false)) } } } #[derive(Clone)] pub(crate) enum StreamHandlerType { Normal, #[cfg(feature = "tls")] Tls(TlsAcceptor), #[cfg(feature = "alpn")] Alpn(SslAcceptor), #[cfg(feature = "rust-tls")] Rustls(Arc), } impl StreamHandlerType { pub fn is_ssl(&self) -> bool { match *self { StreamHandlerType::Normal => false, #[cfg(feature = "tls")] StreamHandlerType::Tls(_) => true, #[cfg(feature = "alpn")] StreamHandlerType::Alpn(_) => true, #[cfg(feature = "rust-tls")] StreamHandlerType::Rustls(_) => true, } } fn handle( &mut self, h: Rc>, msg: Conn, ) { match *self { StreamHandlerType::Normal => { let _ = msg.io.set_nodelay(true); let io = TcpStream::from_std(msg.io, &Handle::default()) .expect("failed to associate TCP stream"); current_thread::spawn(HttpChannel::new(h, io, msg.peer, msg.http2)); } #[cfg(feature = "tls")] StreamHandlerType::Tls(ref acceptor) => { let Conn { io, peer, http2, .. } = msg; let _ = io.set_nodelay(true); let io = TcpStream::from_std(io, &Handle::default()) .expect("failed to associate TCP stream"); self.settings.ssl_conn_add(); current_thread::spawn(TlsAcceptorExt::accept_async(acceptor, io).then( move |res| { self.settings.ssl_conn_del(); match res { Ok(io) => current_thread::spawn(HttpChannel::new( h, io, peer, http2, )), Err(err) => { trace!("Error during handling tls connection: {}", err) } }; future::result(Ok(())) }, )); } #[cfg(feature = "alpn")] StreamHandlerType::Alpn(ref acceptor) => { let Conn { io, peer, .. } = msg; let _ = io.set_nodelay(true); let io = TcpStream::from_std(io, &Handle::default()) .expect("failed to associate TCP stream"); self.settings.ssl_conn_add(); current_thread::spawn(SslAcceptorExt::accept_async(acceptor, io).then( move |res| { self.settings.ssl_conn_del(); match res { Ok(io) => { let http2 = if let Some(p) = io.get_ref().ssl().selected_alpn_protocol() { p.len() == 2 && &p == b"h2" } else { false }; current_thread::spawn(HttpChannel::new( h, io, peer, http2, )); } Err(err) => { trace!("Error during handling tls connection: {}", err) } }; future::result(Ok(())) }, )); } #[cfg(feature = "rust-tls")] StreamHandlerType::Rustls(ref acceptor) => { let Conn { io, peer, .. } = msg; let _ = io.set_nodelay(true); let io = TcpStream::from_std(io, &Handle::default()) .expect("failed to associate TCP stream"); self.settings.ssl_conn_add(); current_thread::spawn(ServerConfigExt::accept_async(acceptor, io).then( move |res| { self.settings.ssl_conn_del(); match res { Ok(io) => { let http2 = if let Some(p) = io.get_ref().1.get_alpn_protocol() { p.len() == 2 && &p == &"h2" } else { false }; current_thread::spawn(HttpChannel::new( h, io, peer, http2, )); } Err(err) => { trace!("Error during handling tls connection: {}", err) } }; future::result(Ok(())) }, )); } } } pub(crate) fn scheme(&self) -> &'static str { match *self { StreamHandlerType::Normal => "http", #[cfg(feature = "tls")] StreamHandlerType::Tls(_) => "https", #[cfg(feature = "alpn")] StreamHandlerType::Alpn(_) => "https", #[cfg(feature = "rust-tls")] StreamHandlerType::Rustls(_) => "https", } } }