use std::rc::Rc; use std::sync::{mpsc as sync_mpsc, Arc}; use std::time::Duration; use std::{io, net, thread}; use actix::{ fut, signal, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, Handler, Response, StreamHandler, System, WrapFuture, }; use futures::sync::mpsc; use futures::{Future, Sink, Stream}; use mio; use net2::TcpBuilder; use num_cpus; use slab::Slab; use tokio_io::{AsyncRead, AsyncWrite}; #[cfg(feature = "tls")] use native_tls::TlsAcceptor; #[cfg(feature = "alpn")] use openssl::ssl::{AlpnError, SslAcceptorBuilder}; use super::channel::WrapperStream; use super::settings::{ServerSettings, WorkerSettings}; use super::worker::{Conn, SocketInfo, StopWorker, StreamHandlerType, Worker}; use super::{IntoHttpHandler, IoStream, KeepAlive}; use super::{PauseServer, ResumeServer, StopServer}; #[cfg(feature = "alpn")] fn configure_alpn(builder: &mut SslAcceptorBuilder) -> io::Result<()> { builder.set_alpn_protos(b"\x02h2\x08http/1.1")?; builder.set_alpn_select_callback(|_, protos| { const H2: &[u8] = b"\x02h2"; if protos.windows(3).any(|window| window == H2) { Ok(b"h2") } else { Err(AlpnError::NOACK) } }); Ok(()) } /// An HTTP Server pub struct HttpServer where H: IntoHttpHandler + 'static, { h: Option>>, threads: usize, backlog: i32, host: Option, keep_alive: KeepAlive, factory: Arc Vec + Send + Sync>, #[cfg_attr(feature = "cargo-clippy", allow(type_complexity))] workers: Vec<(usize, Addr>)>, sockets: Vec, accept: Vec<(mio::SetReadiness, sync_mpsc::Sender)>, exit: bool, shutdown_timeout: u16, signals: Option>, no_http2: bool, no_signals: bool, } unsafe impl Send for HttpServer {} enum ServerCommand { WorkerDied(usize, Slab), } impl Actor for HttpServer where H: IntoHttpHandler, { type Context = Context; } struct Socket { lst: net::TcpListener, addr: net::SocketAddr, tp: StreamHandlerType, } impl HttpServer where H: IntoHttpHandler + 'static, { /// Create new http server with application factory pub fn new(factory: F) -> Self where F: Fn() -> U + Sync + Send + 'static, U: IntoIterator + 'static, { let f = move || (factory)().into_iter().collect(); HttpServer { h: None, threads: num_cpus::get(), backlog: 2048, host: None, keep_alive: KeepAlive::Os, factory: Arc::new(f), workers: Vec::new(), sockets: Vec::new(), accept: Vec::new(), exit: false, shutdown_timeout: 30, signals: None, no_http2: false, no_signals: false, } } /// Set number of workers to start. /// /// By default http server uses number of available logical cpu as threads /// count. pub fn workers(mut self, num: usize) -> Self { self.threads = num; self } #[doc(hidden)] #[deprecated(since = "0.6.0", note = "please use `HttpServer::workers()` instead")] pub fn threads(self, num: usize) -> Self { self.workers(num) } /// Set the maximum number of pending connections. /// /// This refers to the number of clients that can be waiting to be served. /// Exceeding this number results in the client getting an error when /// attempting to connect. It should only affect servers under significant /// load. /// /// Generally set in the 64-2048 range. Default value is 2048. /// /// This method should be called before `bind()` method call. pub fn backlog(mut self, num: i32) -> Self { self.backlog = num; self } /// Set server keep-alive setting. /// /// By default keep alive is set to a `Os`. pub fn keep_alive>(mut self, val: T) -> Self { self.keep_alive = val.into(); self } /// Set server host name. /// /// Host name is used by application router aa a hostname for url /// generation. Check [ConnectionInfo](./dev/struct.ConnectionInfo. /// html#method.host) documentation for more information. pub fn server_hostname(mut self, val: String) -> Self { self.host = Some(val); self } /// Stop actix system. /// /// `SystemExit` message stops currently running system. pub fn system_exit(mut self) -> Self { self.exit = true; self } /// Set alternative address for `ProcessSignals` actor. pub fn signals(mut self, addr: Addr) -> Self { self.signals = Some(addr); self } /// Disable signal handling pub fn disable_signals(mut self) -> Self { self.no_signals = true; self } /// Timeout for graceful workers shutdown. /// /// After receiving a stop signal, workers have this much time to finish /// serving requests. Workers still alive after the timeout are force /// dropped. /// /// By default shutdown timeout sets to 30 seconds. pub fn shutdown_timeout(mut self, sec: u16) -> Self { self.shutdown_timeout = sec; self } /// Disable `HTTP/2` support pub fn no_http2(mut self) -> Self { self.no_http2 = true; self } /// Get addresses of bound sockets. pub fn addrs(&self) -> Vec { self.sockets.iter().map(|s| s.addr).collect() } /// Get addresses of bound sockets and the scheme for it. /// /// This is useful when the server is bound from different sources /// with some sockets listening on http and some listening on https /// and the user should be presented with an enumeration of which /// socket requires which protocol. pub fn addrs_with_scheme(&self) -> Vec<(net::SocketAddr, &str)> { self.sockets .iter() .map(|s| (s.addr, s.tp.scheme())) .collect() } /// Use listener for accepting incoming connection requests /// /// HttpServer does not change any configuration for TcpListener, /// it needs to be configured before passing it to listen() method. pub fn listen(mut self, lst: net::TcpListener) -> Self { let addr = lst.local_addr().unwrap(); self.sockets.push(Socket { addr, lst, tp: StreamHandlerType::Normal, }); self } #[cfg(feature = "tls")] /// Use listener for accepting incoming tls connection requests /// /// HttpServer does not change any configuration for TcpListener, /// it needs to be configured before passing it to listen() method. pub fn listen_tls(mut self, lst: net::TcpListener, acceptor: TlsAcceptor) -> Self { let addr = lst.local_addr().unwrap(); self.sockets.push(Socket { addr, lst, tp: StreamHandlerType::Tls(acceptor.clone()), }); self } #[cfg(feature = "alpn")] /// Use listener for accepting incoming tls connection requests /// /// This method sets alpn protocols to "h2" and "http/1.1" pub fn listen_ssl( mut self, lst: net::TcpListener, mut builder: SslAcceptorBuilder, ) -> io::Result { // alpn support if !self.no_http2 { configure_alpn(&mut builder)?; } let acceptor = builder.build(); let addr = lst.local_addr().unwrap(); self.sockets.push(Socket { addr, lst, tp: StreamHandlerType::Alpn(acceptor.clone()), }); Ok(self) } fn bind2(&mut self, addr: S) -> io::Result> { let mut err = None; let mut succ = false; let mut sockets = Vec::new(); for addr in addr.to_socket_addrs()? { match create_tcp_listener(addr, self.backlog) { Ok(lst) => { succ = true; let addr = lst.local_addr().unwrap(); sockets.push(Socket { lst, addr, tp: StreamHandlerType::Normal, }); } Err(e) => err = Some(e), } } if !succ { if let Some(e) = err.take() { Err(e) } else { Err(io::Error::new( io::ErrorKind::Other, "Can not bind to address.", )) } } else { Ok(sockets) } } /// The socket address to bind /// /// To bind multiple addresses this method can be called multiple times. pub fn bind(mut self, addr: S) -> io::Result { let sockets = self.bind2(addr)?; self.sockets.extend(sockets); Ok(self) } #[cfg(feature = "tls")] /// The ssl socket address to bind /// /// To bind multiple addresses this method can be called multiple times. pub fn bind_tls( mut self, addr: S, acceptor: TlsAcceptor, ) -> io::Result { let sockets = self.bind2(addr)?; self.sockets.extend(sockets.into_iter().map(|mut s| { s.tp = StreamHandlerType::Tls(acceptor.clone()); s })); Ok(self) } #[cfg(feature = "alpn")] /// Start listening for incoming tls connections. /// /// This method sets alpn protocols to "h2" and "http/1.1" pub fn bind_ssl( mut self, addr: S, mut builder: SslAcceptorBuilder, ) -> io::Result { // alpn support if !self.no_http2 { configure_alpn(&mut builder)?; } let acceptor = builder.build(); let sockets = self.bind2(addr)?; self.sockets.extend(sockets.into_iter().map(|mut s| { s.tp = StreamHandlerType::Alpn(acceptor.clone()); s })); Ok(self) } fn start_workers( &mut self, settings: &ServerSettings, sockets: &Slab, ) -> Vec<(usize, mpsc::UnboundedSender>)> { // start workers let mut workers = Vec::new(); for idx in 0..self.threads { let (tx, rx) = mpsc::unbounded::>(); let ka = self.keep_alive; let socks = sockets.clone(); let factory = Arc::clone(&self.factory); let parts = settings.parts(); let addr = Arbiter::start(move |ctx: &mut Context<_>| { let s = ServerSettings::from_parts(parts); let apps: Vec<_> = (*factory)() .into_iter() .map(|h| h.into_handler(s.clone())) .collect(); ctx.add_message_stream(rx); Worker::new(apps, socks, ka) }); workers.push((idx, tx)); self.workers.push((idx, addr)); } info!("Starting {} http workers", self.threads); workers } // subscribe to os signals fn subscribe_to_signals(&self) -> Option> { if !self.no_signals { if let Some(ref signals) = self.signals { Some(signals.clone()) } else { Some(System::current().registry().get::()) } } else { None } } } impl HttpServer { /// Start listening for incoming connections. /// /// This method starts number of http handler workers in separate threads. /// For each address this method starts separate thread which does /// `accept()` in a loop. /// /// This methods panics if no socket addresses get bound. /// /// This method requires to run within properly configured `Actix` system. /// /// ```rust /// extern crate actix_web; /// use actix_web::{actix, server, App, HttpResponse}; /// /// fn main() { /// let sys = actix::System::new("example"); // <- create Actix system /// /// server::new(|| App::new().resource("/", |r| r.h(|_| HttpResponse::Ok()))) /// .bind("127.0.0.1:0") /// .expect("Can not bind to 127.0.0.1:0") /// .start(); /// # actix::System::current().stop(); /// sys.run(); // <- Run actix system, this method starts all async processes /// } /// ``` pub fn start(mut self) -> Addr { if self.sockets.is_empty() { panic!("HttpServer::bind() has to be called before start()"); } else { let (tx, rx) = mpsc::unbounded(); let mut socks = Slab::new(); let mut addrs: Vec<(usize, Socket)> = Vec::new(); for socket in self.sockets.drain(..) { let entry = socks.vacant_entry(); let token = entry.key(); entry.insert(SocketInfo { addr: socket.addr, htype: socket.tp.clone(), }); addrs.push((token, socket)); } let settings = ServerSettings::new(Some(addrs[0].1.addr), &self.host, false); let workers = self.start_workers(&settings, &socks); // start acceptors threads for (token, sock) in addrs { info!("Starting server on http://{}", sock.addr); self.accept.push(start_accept_thread( token, sock, tx.clone(), socks.clone(), workers.clone(), )); } // start http server actor let signals = self.subscribe_to_signals(); let addr = Actor::create(move |ctx| { ctx.add_stream(rx); self }); if let Some(signals) = signals { signals.do_send(signal::Subscribe(addr.clone().recipient())) } addr } } /// Spawn new thread and start listening for incoming connections. /// /// This method spawns new thread and starts new actix system. Other than /// that it is similar to `start()` method. This method blocks. /// /// This methods panics if no socket addresses get bound. /// /// ```rust,ignore /// # extern crate futures; /// # extern crate actix_web; /// # use futures::Future; /// use actix_web::*; /// /// fn main() { /// HttpServer::new(|| App::new().resource("/", |r| r.h(|_| HttpResponse::Ok()))) /// .bind("127.0.0.1:0") /// .expect("Can not bind to 127.0.0.1:0") /// .run(); /// } /// ``` pub fn run(mut self) { self.exit = true; self.no_signals = false; let _ = thread::spawn(move || { let sys = System::new("http-server"); self.start(); sys.run(); }).join(); } } #[doc(hidden)] #[cfg(feature = "tls")] #[deprecated( since = "0.6.0", note = "please use `actix_web::HttpServer::bind_tls` instead" )] impl HttpServer { /// Start listening for incoming tls connections. pub fn start_tls(mut self, acceptor: TlsAcceptor) -> io::Result> { for sock in &mut self.sockets { match sock.tp { StreamHandlerType::Normal => (), _ => continue, } sock.tp = StreamHandlerType::Tls(acceptor.clone()); } Ok(self.start()) } } #[doc(hidden)] #[cfg(feature = "alpn")] #[deprecated( since = "0.6.0", note = "please use `actix_web::HttpServer::bind_ssl` instead" )] impl HttpServer { /// Start listening for incoming tls connections. /// /// This method sets alpn protocols to "h2" and "http/1.1" pub fn start_ssl( mut self, mut builder: SslAcceptorBuilder, ) -> io::Result> { // alpn support if !self.no_http2 { builder.set_alpn_protos(b"\x02h2\x08http/1.1")?; builder.set_alpn_select_callback(|_, protos| { const H2: &[u8] = b"\x02h2"; if protos.windows(3).any(|window| window == H2) { Ok(b"h2") } else { Err(AlpnError::NOACK) } }); } let acceptor = builder.build(); for sock in &mut self.sockets { match sock.tp { StreamHandlerType::Normal => (), _ => continue, } sock.tp = StreamHandlerType::Alpn(acceptor.clone()); } Ok(self.start()) } } impl HttpServer { /// Start listening for incoming connections from a stream. /// /// This method uses only one thread for handling incoming connections. pub fn start_incoming(mut self, stream: S, secure: bool) -> Addr where S: Stream + Send + 'static, T: AsyncRead + AsyncWrite + Send + 'static, { // set server settings let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap(); let settings = ServerSettings::new(Some(addr), &self.host, secure); let apps: Vec<_> = (*self.factory)() .into_iter() .map(|h| h.into_handler(settings.clone())) .collect(); self.h = Some(Rc::new(WorkerSettings::new(apps, self.keep_alive))); // start server let signals = self.subscribe_to_signals(); let addr = HttpServer::create(move |ctx| { ctx.add_message_stream(stream.map_err(|_| ()).map(move |t| Conn { io: WrapperStream::new(t), token: 0, peer: None, http2: false, })); self }); if let Some(signals) = signals { signals.do_send(signal::Subscribe(addr.clone().recipient())) } addr } } /// Signals support /// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system /// message to `System` actor. impl Handler for HttpServer { type Result = (); fn handle(&mut self, msg: signal::Signal, ctx: &mut Context) { match msg.0 { signal::SignalType::Int => { info!("SIGINT received, exiting"); self.exit = true; Handler::::handle(self, StopServer { graceful: false }, ctx); } signal::SignalType::Term => { info!("SIGTERM received, stopping"); self.exit = true; Handler::::handle(self, StopServer { graceful: true }, ctx); } signal::SignalType::Quit => { info!("SIGQUIT received, exiting"); self.exit = true; Handler::::handle(self, StopServer { graceful: false }, ctx); } _ => (), } } } /// Commands from accept threads impl StreamHandler for HttpServer { fn handle(&mut self, msg: Result, ()>, _: &mut Context) { if let Ok(Some(ServerCommand::WorkerDied(idx, socks))) = msg { let mut found = false; for i in 0..self.workers.len() { if self.workers[i].0 == idx { self.workers.swap_remove(i); found = true; break; } } if found { error!("Worker has died {:?}, restarting", idx); let (tx, rx) = mpsc::unbounded::>(); let mut new_idx = self.workers.len(); 'found: loop { for i in 0..self.workers.len() { if self.workers[i].0 == new_idx { new_idx += 1; continue 'found; } } break; } let ka = self.keep_alive; let factory = Arc::clone(&self.factory); let host = self.host.clone(); let addr = socks[0].addr; let addr = Arbiter::start(move |ctx: &mut Context<_>| { let settings = ServerSettings::new(Some(addr), &host, false); let apps: Vec<_> = (*factory)() .into_iter() .map(|h| h.into_handler(settings.clone())) .collect(); ctx.add_message_stream(rx); Worker::new(apps, socks, ka) }); for item in &self.accept { let _ = item.1.send(Command::Worker(new_idx, tx.clone())); let _ = item.0.set_readiness(mio::Ready::readable()); } self.workers.push((new_idx, addr)); } } } } impl Handler> for HttpServer where T: IoStream, H: IntoHttpHandler, { type Result = (); fn handle(&mut self, _msg: Conn, _: &mut Context) -> Self::Result { unimplemented!(); /*Arbiter::spawn(HttpChannel::new( Rc::clone(self.h.as_ref().unwrap()), msg.io, msg.peer, msg.http2, ));*/ } } impl Handler for HttpServer { type Result = (); fn handle(&mut self, _: PauseServer, _: &mut Context) { for item in &self.accept { let _ = item.1.send(Command::Pause); let _ = item.0.set_readiness(mio::Ready::readable()); } } } impl Handler for HttpServer { type Result = (); fn handle(&mut self, _: ResumeServer, _: &mut Context) { for item in &self.accept { let _ = item.1.send(Command::Resume); let _ = item.0.set_readiness(mio::Ready::readable()); } } } impl Handler for HttpServer { type Result = Response<(), ()>; fn handle(&mut self, msg: StopServer, ctx: &mut Context) -> Self::Result { // stop accept threads for item in &self.accept { let _ = item.1.send(Command::Stop); let _ = item.0.set_readiness(mio::Ready::readable()); } // stop workers let (tx, rx) = mpsc::channel(1); let dur = if msg.graceful { Some(Duration::new(u64::from(self.shutdown_timeout), 0)) } else { None }; for worker in &self.workers { let tx2 = tx.clone(); ctx.spawn( worker .1 .send(StopWorker { graceful: dur }) .into_actor(self) .then(move |_, slf, ctx| { slf.workers.pop(); if slf.workers.is_empty() { let _ = tx2.send(()); // we need to stop system if server was spawned if slf.exit { ctx.run_later(Duration::from_millis(300), |_, _| { System::current().stop(); }); } } fut::ok(()) }), ); } if !self.workers.is_empty() { Response::async(rx.into_future().map(|_| ()).map_err(|_| ())) } else { // we need to stop system if server was spawned if self.exit { ctx.run_later(Duration::from_millis(300), |_, _| { System::current().stop(); }); } Response::reply(Ok(())) } } } enum Command { Pause, Resume, Stop, Worker(usize, mpsc::UnboundedSender>), } fn start_accept_thread( token: usize, sock: Socket, srv: mpsc::UnboundedSender, socks: Slab, mut workers: Vec<(usize, mpsc::UnboundedSender>)>, ) -> (mio::SetReadiness, sync_mpsc::Sender) { let (tx, rx) = sync_mpsc::channel(); let (reg, readiness) = mio::Registration::new2(); // start accept thread #[cfg_attr(feature = "cargo-clippy", allow(cyclomatic_complexity))] let _ = thread::Builder::new() .name(format!("Accept on {}", sock.addr)) .spawn(move || { const SRV: mio::Token = mio::Token(0); const CMD: mio::Token = mio::Token(1); let addr = sock.addr; let mut server = Some( mio::net::TcpListener::from_std(sock.lst) .expect("Can not create mio::net::TcpListener"), ); // Create a poll instance let poll = match mio::Poll::new() { Ok(poll) => poll, Err(err) => panic!("Can not create mio::Poll: {}", err), }; // Start listening for incoming connections if let Some(ref srv) = server { if let Err(err) = poll.register(srv, SRV, mio::Ready::readable(), mio::PollOpt::edge()) { panic!("Can not register io: {}", err); } } // Start listening for incoming commands if let Err(err) = poll.register(®, CMD, mio::Ready::readable(), mio::PollOpt::edge()) { panic!("Can not register Registration: {}", err); } // Create storage for events let mut events = mio::Events::with_capacity(128); // Sleep on error let sleep = Duration::from_millis(100); let mut next = 0; loop { if let Err(err) = poll.poll(&mut events, None) { panic!("Poll error: {}", err); } for event in events.iter() { match event.token() { SRV => if let Some(ref server) = server { loop { match server.accept_std() { Ok((io, addr)) => { let mut msg = Conn { io, token, peer: Some(addr), http2: false, }; while !workers.is_empty() { match workers[next].1.unbounded_send(msg) { Ok(_) => (), Err(err) => { let _ = srv.unbounded_send( ServerCommand::WorkerDied( workers[next].0, socks.clone(), ), ); msg = err.into_inner(); workers.swap_remove(next); if workers.is_empty() { error!("No workers"); thread::sleep(sleep); break; } else if workers.len() <= next { next = 0; } continue; } } next = (next + 1) % workers.len(); break; } } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { break } Err(ref e) if connection_error(e) => continue, Err(e) => { error!("Error accepting connection: {}", e); // sleep after error thread::sleep(sleep); break; } } } }, CMD => match rx.try_recv() { Ok(cmd) => match cmd { Command::Pause => if let Some(ref server) = server { if let Err(err) = poll.deregister(server) { error!( "Can not deregister server socket {}", err ); } else { info!( "Paused accepting connections on {}", addr ); } }, Command::Resume => { if let Some(ref server) = server { if let Err(err) = poll.register( server, SRV, mio::Ready::readable(), mio::PollOpt::edge(), ) { error!("Can not resume socket accept process: {}", err); } else { info!("Accepting connections on {} has been resumed", addr); } } } Command::Stop => { if let Some(server) = server.take() { let _ = poll.deregister(&server); } return; } Command::Worker(idx, addr) => { workers.push((idx, addr)); } }, Err(err) => match err { sync_mpsc::TryRecvError::Empty => (), sync_mpsc::TryRecvError::Disconnected => { if let Some(server) = server.take() { let _ = poll.deregister(&server); } return; } }, }, _ => unreachable!(), } } } }); (readiness, tx) } fn create_tcp_listener( addr: net::SocketAddr, backlog: i32, ) -> io::Result { let builder = match addr { net::SocketAddr::V4(_) => TcpBuilder::new_v4()?, net::SocketAddr::V6(_) => TcpBuilder::new_v6()?, }; builder.reuse_address(true)?; builder.bind(addr)?; Ok(builder.listen(backlog)?) } /// This function defines errors that are per-connection. Which basically /// means that if we get this error from `accept()` system call it means /// next connection might be ready to be accepted. /// /// All other errors will incur a timeout before next `accept()` is performed. /// The timeout is useful to handle resource exhaustion errors like ENFILE /// and EMFILE. Otherwise, could enter into tight loop. fn connection_error(e: &io::Error) -> bool { e.kind() == io::ErrorKind::ConnectionRefused || e.kind() == io::ErrorKind::ConnectionAborted || e.kind() == io::ErrorKind::ConnectionReset }