From 0589f2ee495a31654c8d641390ce220ab0818655 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 27 Dec 2017 12:58:32 -0800 Subject: [PATCH] add server management commands --- Cargo.toml | 2 +- src/lib.rs | 5 +- src/server.rs | 226 +++++++++++++++++++++++++++++++++---------- tests/test_server.rs | 20 +++- 4 files changed, 196 insertions(+), 57 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 43c286f71..2eb523183 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,7 +74,7 @@ tokio-tls = { version="0.1", optional = true } tokio-openssl = { version="0.1", optional = true } [dependencies.actix] -version = "^0.3.4" +version = "^0.3.5" default-features = false features = [] diff --git a/src/lib.rs b/src/lib.rs index ec2dafa84..f0178178d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -71,8 +71,8 @@ extern crate brotli2; extern crate percent_encoding; extern crate smallvec; extern crate num_cpus; -extern crate actix; extern crate h2 as http2; +#[macro_use] extern crate actix; #[cfg(test)] #[macro_use] extern crate serde_derive; @@ -173,7 +173,8 @@ pub mod dev { pub use pipeline::Pipeline; pub use channel::{HttpChannel, HttpHandler, IntoHttpHandler}; pub use param::{FromParam, Params}; - pub use server::ServerSettings; pub use httprequest::UrlEncoded; pub use httpresponse::HttpResponseBuilder; + + pub use server::{ServerSettings, PauseServer, ResumeServer, StopServer}; } diff --git a/src/server.rs b/src/server.rs index 082fb5dec..ff0aa1dee 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,7 +1,7 @@ use std::{io, net, thread}; use std::rc::Rc; use std::cell::{RefCell, RefMut}; -use std::sync::Arc; +use std::sync::{Arc, mpsc as sync_mpsc}; use std::time::Duration; use std::marker::PhantomData; use std::collections::HashMap; @@ -106,6 +106,7 @@ pub struct HttpServer factory: Arc U + Send + Sync>, workers: Vec>>, sockets: HashMap, + accept: Vec<(mio::SetReadiness, sync_mpsc::Sender)>, } impl Actor for HttpServer { @@ -144,6 +145,7 @@ impl HttpServer factory: Arc::new(factory), workers: Vec::new(), sockets: HashMap::new(), + accept: Vec::new(), } } @@ -206,19 +208,10 @@ impl HttpServer let mut succ = false; if let Ok(iter) = addr.to_socket_addrs() { for addr in iter { - let builder = match addr { - net::SocketAddr::V4(_) => TcpBuilder::new_v4()?, - net::SocketAddr::V6(_) => TcpBuilder::new_v6()?, - }; - match builder.bind(addr) { - Ok(builder) => match builder.reuse_address(true) { - Ok(builder) => { - succ = true; - let lst = builder.listen(self.backlog) - .expect("failed to set socket backlog"); - self.sockets.insert(lst.local_addr().unwrap(), lst); - }, - Err(e) => err = Some(e) + match create_tcp_listener(addr, self.backlog) { + Ok(lst) => { + succ = true; + self.sockets.insert(lst.local_addr().unwrap(), lst); }, Err(e) => err = Some(e), } @@ -309,7 +302,8 @@ impl HttpServer // start acceptors threads for (addr, sock) in addrs { info!("Starting http server on {}", addr); - start_accept_thread(sock, addr, workers.clone()); + self.accept.push( + start_accept_thread(sock, addr, self.backlog, workers.clone())); } // start http server actor @@ -328,7 +322,7 @@ impl HttpServer, net::SocketAddr, H, if self.sockets.is_empty() { Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound")) } else { - let addrs: Vec<(net::SocketAddr, Socket)> = self.sockets.drain().collect(); + let addrs: Vec<(net::SocketAddr, net::TcpListener)> = self.sockets.drain().collect(); let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false); let acceptor = match TlsAcceptor::builder(pkcs12) { Ok(builder) => { @@ -344,7 +338,8 @@ impl HttpServer, net::SocketAddr, H, // start acceptors threads for (addr, sock) in addrs { info!("Starting tls http server on {}", addr); - start_accept_thread(sock, addr, workers.clone()); + self.accept.push( + start_accept_thread(sock, addr, self.backlog, workers.clone())); } // start http server actor @@ -365,7 +360,7 @@ impl HttpServer, net::SocketAddr, H, if self.sockets.is_empty() { Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound")) } else { - let addrs: Vec<(net::SocketAddr, Socket)> = self.sockets.drain().collect(); + let addrs: Vec<(net::SocketAddr, net::TcpListener)> = self.sockets.drain().collect(); let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false); let acceptor = match SslAcceptorBuilder::mozilla_intermediate( SslMethod::tls(), &identity.pkey, &identity.cert, &identity.chain) @@ -383,7 +378,8 @@ impl HttpServer, net::SocketAddr, H, // start acceptors threads for (addr, sock) in addrs { info!("Starting tls http server on {}", addr); - start_accept_thread(sock, addr, workers.clone()); + self.accept.push( + start_accept_thread(sock, addr, workers.clone(), self.backlog)); } // start http server actor @@ -414,7 +410,8 @@ impl HttpServer // start acceptors threads for (addr, sock) in addrs { info!("Starting http server on {}", addr); - start_accept_thread(sock, addr, workers.clone()); + self.accept.push( + start_accept_thread(sock, addr, self.backlog, workers.clone())); } } @@ -436,18 +433,13 @@ impl HttpServer } } +#[derive(Message)] struct IoStream { io: T, peer: Option, http2: bool, } -impl ResponseType for IoStream -{ - type Item = (); - type Error = (); -} - impl StreamHandler, io::Error> for HttpServer where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static, @@ -473,6 +465,67 @@ impl Handler, io::Error> for HttpServer } } +/// Pause connection accepting process +#[derive(Message)] +pub struct PauseServer; + +/// Resume connection accepting process +#[derive(Message)] +pub struct ResumeServer; + +/// Stop connection processing and exit +#[derive(Message)] +pub struct StopServer; + +impl Handler for HttpServer + where T: AsyncRead + AsyncWrite + 'static, + H: HttpHandler + 'static, + U: 'static, + A: 'static, +{ + fn handle(&mut self, _: PauseServer, _: &mut Context) -> Response + { + for item in &self.accept { + let _ = item.1.send(Command::Pause); + let _ = item.0.set_readiness(mio::Ready::readable()); + } + Self::empty() + } +} + +impl Handler for HttpServer + where T: AsyncRead + AsyncWrite + 'static, + H: HttpHandler + 'static, + U: 'static, + A: 'static, +{ + fn handle(&mut self, _: ResumeServer, _: &mut Context) -> Response + { + for item in &self.accept { + let _ = item.1.send(Command::Resume); + let _ = item.0.set_readiness(mio::Ready::readable()); + } + Self::empty() + } +} + +impl Handler for HttpServer + where T: AsyncRead + AsyncWrite + 'static, + H: HttpHandler + 'static, + U: 'static, + A: 'static, +{ + fn handle(&mut self, _: StopServer, ctx: &mut Context) -> Response + { + for item in &self.accept { + let _ = item.1.send(Command::Stop); + let _ = item.0.set_readiness(mio::Ready::readable()); + } + ctx.stop(); + Self::empty() + } +} + /// Http worker /// /// Worker accepts Socket objects via unbounded channel and start requests processing. @@ -589,10 +642,11 @@ impl StreamHandlerType { let io = TcpStream::from_stream(io, hnd) .expect("failed to associate TCP stream"); - Arbiter::handle().spawn( + hnd.spawn( TlsAcceptorExt::accept_async(acceptor, io).then(move |res| { match res { - Ok(io) => hnd.spawn(HttpChannel::new(h, io, peer, http2)), + Ok(io) => Arbiter::handle().spawn( + HttpChannel::new(h, io, peer, http2)), Err(err) => trace!("Error during handling tls connection: {}", err), }; @@ -629,14 +683,27 @@ impl StreamHandlerType { } } -fn start_accept_thread(sock: net::TcpListener, addr: net::SocketAddr, - workers: Vec>>) { +enum Command { + Pause, + Resume, + Stop, +} + +fn start_accept_thread(sock: net::TcpListener, addr: net::SocketAddr, backlog: i32, + workers: Vec>>) + -> (mio::SetReadiness, sync_mpsc::Sender) +{ + let (tx, rx) = sync_mpsc::channel(); + let (reg, readiness) = mio::Registration::new2(); + // start accept thread let _ = thread::Builder::new().name(format!("Accept on {}", addr)).spawn(move || { - let mut next = 0; - let server = mio::net::TcpListener::from_listener(sock, &addr) - .expect("Can not create mio::net::TcpListener"); - const SERVER: mio::Token = mio::Token(0); + const SRV: mio::Token = mio::Token(0); + const CMD: mio::Token = mio::Token(1); + + let mut server = Some( + mio::net::TcpListener::from_listener(sock, &addr) + .expect("Can not create mio::net::TcpListener")); // Create a poll instance let poll = match mio::Poll::new() { @@ -645,14 +712,23 @@ fn start_accept_thread(sock: net::TcpListener, addr: net::SocketAddr, }; // Start listening for incoming connections - if let Err(err) = poll.register(&server, SERVER, + if let Some(ref srv) = server { + if let Err(err) = poll.register( + srv, SRV, mio::Ready::readable(), mio::PollOpt::edge()) { + panic!("Can not register io: {}", err); + } + } + + // Start listening for incommin commands + if let Err(err) = poll.register(®, CMD, mio::Ready::readable(), mio::PollOpt::edge()) { - panic!("Can not register io: {}", err); + panic!("Can not register Registration: {}", err); } // Create storage for events let mut events = mio::Events::with_capacity(128); + let mut next = 0; loop { if let Err(err) = poll.poll(&mut events, None) { panic!("Poll error: {}", err); @@ -660,27 +736,77 @@ fn start_accept_thread(sock: net::TcpListener, addr: net::SocketAddr, for event in events.iter() { match event.token() { - SERVER => { - loop { - match server.accept_std() { - Ok((sock, addr)) => { - let msg = IoStream{io: sock, peer: Some(addr), http2: false}; - workers[next] - .unbounded_send(msg).expect("worker thread died"); - next = (next + 1) % workers.len(); - }, - Err(err) => if err.kind() == io::ErrorKind::WouldBlock { - break - } else { - error!("Error accepting connection: {:?}", err); - return + SRV => { + if let Some(ref server) = server { + loop { + match server.accept_std() { + Ok((sock, addr)) => { + let msg = IoStream{ + io: sock, peer: Some(addr), http2: false}; + workers[next].unbounded_send(msg) + .expect("worker thread died"); + next = (next + 1) % workers.len(); + }, + Err(err) => if err.kind() == io::ErrorKind::WouldBlock { + break + } else { + error!("Error accepting connection: {:?}", err); + return + } } } } + }, + CMD => match rx.try_recv() { + Ok(cmd) => match cmd { + Command::Pause => if let Some(server) = server.take() { + if let Err(err) = poll.deregister(&server) { + error!("Can not deregister server socket {}", err); + } else { + info!("Paused accepting connections on {}", addr); + } + }, + Command::Resume => { + let lst = create_tcp_listener(addr, backlog) + .expect("Can not create net::TcpListener"); + + server = Some( + mio::net::TcpListener::from_listener(lst, &addr) + .expect("Can not create mio::net::TcpListener")); + + if let Some(ref server) = server { + if let Err(err) = poll.register( + server, SRV, mio::Ready::readable(), mio::PollOpt::edge()) + { + error!("Can not resume socket accept process: {}", err); + } else { + info!("Accepting connections on {} has been resumed", + addr); + } + } + }, + Command::Stop => return, + }, + Err(err) => match err { + sync_mpsc::TryRecvError::Empty => (), + sync_mpsc::TryRecvError::Disconnected => return, + } } _ => unreachable!(), } } } }); + + (readiness, tx) +} + +fn create_tcp_listener(addr: net::SocketAddr, backlog: i32) -> io::Result { + let builder = match addr { + net::SocketAddr::V4(_) => TcpBuilder::new_v4()?, + net::SocketAddr::V6(_) => TcpBuilder::new_v6()?, + }; + builder.bind(addr)?; + builder.reuse_address(true)?; + Ok(builder.listen(backlog)?) } diff --git a/tests/test_server.rs b/tests/test_server.rs index 6884d30f1..cfea669ba 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -2,10 +2,12 @@ extern crate actix; extern crate actix_web; extern crate tokio_core; extern crate reqwest; +extern crate futures; -use std::thread; +use std::{net, thread}; use std::sync::{Arc, mpsc}; use std::sync::atomic::{AtomicUsize, Ordering}; +use futures::Future; use actix_web::*; use actix::System; @@ -20,12 +22,22 @@ fn test_start() { let srv = HttpServer::new( || vec![Application::new() .resource("/", |r| r.method(Method::GET).h(httpcodes::HTTPOk))]); + let srv = srv.bind("127.0.0.1:0").unwrap(); - let _ = tx.send(srv.addrs()[0].clone()); - srv.start(); + let addr = srv.addrs()[0].clone(); + let srv_addr = srv.start(); + let _ = tx.send((addr, srv_addr)); sys.run(); }); - let addr = rx.recv().unwrap(); + let (addr, srv_addr) = rx.recv().unwrap(); + assert!(reqwest::get(&format!("http://{}/", addr)).unwrap().status().is_success()); + + // pause + let _ = srv_addr.call_fut(dev::PauseServer).wait(); + assert!(net::TcpStream::connect(addr).is_err()); + + // resume + let _ = srv_addr.call_fut(dev::ResumeServer).wait(); assert!(reqwest::get(&format!("http://{}/", addr)).unwrap().status().is_success()); }