From ffb07c8884eafaae17f578f2506d749ec86c01a3 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 9 Dec 2018 21:51:35 -0800 Subject: [PATCH] use actix-rt for server impl --- Cargo.toml | 6 +- actix-rt/src/arbiter.rs | 41 +++--- examples/basic.rs | 16 +- examples/ssl.rs | 14 +- src/framed.rs | 9 +- src/resolver.rs | 2 +- src/server/accept.rs | 58 +++----- src/server/builder.rs | 316 ++++++++++++++++++---------------------- src/server/config.rs | 2 +- src/server/mod.rs | 29 +--- src/server/server.rs | 63 ++++++++ src/server/services.rs | 2 +- src/stream.rs | 2 +- src/time.rs | 2 +- 14 files changed, 270 insertions(+), 292 deletions(-) create mode 100644 src/server/server.rs diff --git a/Cargo.toml b/Cargo.toml index fe733456..574ec314 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,7 +48,6 @@ rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots"] cell = [] [dependencies] -actix = "0.7.6" actix-service = "0.1.1" actix-codec = { path = "actix-codec" } actix-rt = { path = "actix-rt" } @@ -62,13 +61,12 @@ net2 = "0.2" bytes = "0.4" futures = "0.1" slab = "0.4" -tokio = "0.1" -tokio-codec = "0.1" tokio-io = "0.1" tokio-tcp = "0.1" tokio-timer = "0.2" tokio-reactor = "0.1" -tokio-current-thread = "0.1" +tokio-signal = "0.2" + trust-dns-proto = "^0.5.0" trust-dns-resolver = "^0.10.0" diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 56fffc88..36a5b9c6 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -234,29 +234,30 @@ impl Future for SystemArbiter { type Error = (); fn poll(&mut self) -> Poll { - match self.commands.poll() { - Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), - Ok(Async::Ready(Some(cmd))) => match cmd { - SystemCommand::Exit(code) => { - // stop arbiters - for arb in self.arbiters.values() { - arb.stop(); + loop { + match self.commands.poll() { + Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), + Ok(Async::Ready(Some(cmd))) => match cmd { + SystemCommand::Exit(code) => { + // stop arbiters + for arb in self.arbiters.values() { + arb.stop(); + } + // stop event loop + if let Some(stop) = self.stop.take() { + let _ = stop.send(code); + } } - // stop event loop - if let Some(stop) = self.stop.take() { - let _ = stop.send(code); + SystemCommand::RegisterArbiter(name, hnd) => { + self.arbiters.insert(name, hnd); } - } - SystemCommand::RegisterArbiter(name, hnd) => { - self.arbiters.insert(name, hnd); - } - SystemCommand::UnregisterArbiter(name) => { - self.arbiters.remove(&name); - } - }, - Ok(Async::NotReady) => return Ok(Async::NotReady), + SystemCommand::UnregisterArbiter(name) => { + self.arbiters.remove(&name); + } + }, + Ok(Async::NotReady) => return Ok(Async::NotReady), + } } - Ok(Async::NotReady) } } diff --git a/examples/basic.rs b/examples/basic.rs index f31b6d76..846a6a4e 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -1,15 +1,6 @@ //! simple composite service //! build: cargo run --example basic --features "ssl" //! to test: curl https://127.0.0.1:8443/ -k -extern crate actix; -extern crate actix_net; -extern crate env_logger; -extern crate futures; -extern crate openssl; -extern crate tokio_io; -extern crate tokio_openssl; -extern crate tokio_tcp; - use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -22,7 +13,8 @@ use tokio_io::{AsyncRead, AsyncWrite}; use tokio_openssl::SslAcceptorExt; use actix_net::server::Server; -use actix_net::service::{IntoNewService, NewServiceExt}; +use actix_rt::System; +use actix_service::{IntoNewService, NewService}; /// Simple logger service, it just prints fact of the new connections fn logger( @@ -36,7 +28,7 @@ fn main() { env::set_var("RUST_LOG", "actix_net=trace"); env_logger::init(); - let sys = actix::System::new("test"); + let sys = System::new("test"); // load ssl keys let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); @@ -53,7 +45,7 @@ fn main() { // bind socket address and start workers. By default server uses number of // available logical cpu as threads count. actix net start separate // instances of service pipeline in each worker. - Server::default() + Server::build() .bind( // configure service pipeline "basic", diff --git a/examples/ssl.rs b/examples/ssl.rs index 62e3aac7..68dd97a8 100644 --- a/examples/ssl.rs +++ b/examples/ssl.rs @@ -1,10 +1,3 @@ -extern crate actix; -extern crate actix_net; -extern crate futures; -extern crate openssl; -extern crate tokio_io; -extern crate tokio_tcp; - use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -15,8 +8,9 @@ use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; use tokio_io::{AsyncRead, AsyncWrite}; use actix_net::server::Server; -use actix_net::service::NewServiceExt; use actix_net::ssl; +use actix_rt::System; +use actix_service::NewService; #[derive(Debug)] struct ServiceState { @@ -33,7 +27,7 @@ fn service( } fn main() { - let sys = actix::System::new("test"); + let sys = System::new("test"); // load ssl keys let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); @@ -48,7 +42,7 @@ fn main() { let openssl = ssl::OpensslAcceptor::new(builder.build()); // server start mutiple workers, it runs supplied `Fn` in each worker. - Server::default() + Server::build() .bind("test-ssl", "0.0.0.0:8443", move || { let num = num.clone(); diff --git a/src/framed.rs b/src/framed.rs index c1385ddd..42e46ed6 100644 --- a/src/framed.rs +++ b/src/framed.rs @@ -2,13 +2,12 @@ use std::marker::PhantomData; use std::mem; -use actix; -use actix_codec::Framed; +use actix_codec::{Decoder, Encoder, Framed}; +use actix_rt::Arbiter; use actix_service::{IntoNewService, IntoService, NewService, Service}; use futures::future::{ok, FutureResult}; use futures::unsync::mpsc; use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; -use tokio_codec::{Decoder, Encoder}; use tokio_io::{AsyncRead, AsyncWrite}; type Request = ::Item; @@ -256,7 +255,7 @@ where Ok(Async::Ready(_)) => { if let Some(item) = self.request.take() { let sender = self.write_tx.clone(); - actix::Arbiter::spawn( + Arbiter::spawn( self.service .call(item) .then(|item| sender.send(item).map(|_| ()).map_err(|_| ())), @@ -281,7 +280,7 @@ where match self.service.poll_ready() { Ok(Async::Ready(_)) => { let sender = self.write_tx.clone(); - actix::Arbiter::spawn( + Arbiter::spawn( self.service .call(item) .then(|item| sender.send(item).map(|_| ()).map_err(|_| ())), diff --git a/src/resolver.rs b/src/resolver.rs index 9c67a2c1..bb580ca1 100644 --- a/src/resolver.rs +++ b/src/resolver.rs @@ -4,8 +4,8 @@ use std::net::IpAddr; use futures::{Async, Future, Poll}; +use actix_rt::spawn; use actix_service::Service; -use tokio_current_thread::spawn; use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; pub use trust_dns_resolver::error::ResolveError; use trust_dns_resolver::lookup_ip::LookupIpFuture; diff --git a/src/server/accept.rs b/src/server/accept.rs index a902b0fb..9426a7e3 100644 --- a/src/server/accept.rs +++ b/src/server/accept.rs @@ -2,15 +2,14 @@ use std::sync::mpsc as sync_mpsc; use std::time::{Duration, Instant}; use std::{io, net, thread}; -use futures::{sync::mpsc, Future}; +use actix_rt::System; +use futures::future::{lazy, Future}; use log::{error, info}; use mio; use slab::Slab; use tokio_timer::Delay; -use actix::{msgs::Execute, Arbiter, System}; - -use super::server::ServerCommand; +use super::server::Server; use super::worker::{Conn, WorkerClient}; use super::Token; @@ -54,14 +53,11 @@ pub(crate) struct AcceptLoop { notify_ready: mio::SetReadiness, tx: sync_mpsc::Sender, rx: Option>, - srv: Option<( - mpsc::UnboundedSender, - mpsc::UnboundedReceiver, - )>, + srv: Option, } impl AcceptLoop { - pub fn new() -> AcceptLoop { + pub fn new(srv: Server) -> AcceptLoop { let (tx, rx) = sync_mpsc::channel(); let (cmd_reg, cmd_ready) = mio::Registration::new2(); let (notify_reg, notify_ready) = mio::Registration::new2(); @@ -73,7 +69,7 @@ impl AcceptLoop { notify_ready, notify_reg: Some(notify_reg), rx: Some(rx), - srv: Some(mpsc::unbounded()), + srv: Some(srv), } } @@ -90,18 +86,17 @@ impl AcceptLoop { &mut self, socks: Vec<(Token, net::TcpListener)>, workers: Vec, - ) -> mpsc::UnboundedReceiver { - let (tx, rx) = self.srv.take().expect("Can not re-use AcceptInfo"); + ) { + let srv = self.srv.take().expect("Can not re-use AcceptInfo"); Accept::start( self.rx.take().expect("Can not re-use AcceptInfo"), self.cmd_reg.take().expect("Can not re-use AcceptInfo"), self.notify_reg.take().expect("Can not re-use AcceptInfo"), socks, - tx, + srv, workers, ); - rx } } @@ -110,7 +105,7 @@ struct Accept { rx: sync_mpsc::Receiver, sockets: Slab, workers: Vec, - srv: mpsc::UnboundedSender, + srv: Server, timer: (mio::Registration, mio::SetReadiness), next: usize, backpressure: bool, @@ -141,7 +136,7 @@ impl Accept { cmd_reg: mio::Registration, notify_reg: mio::Registration, socks: Vec<(Token, net::TcpListener)>, - srv: mpsc::UnboundedSender, + srv: Server, workers: Vec, ) { let sys = System::current(); @@ -181,7 +176,7 @@ impl Accept { rx: sync_mpsc::Receiver, socks: Vec<(Token, net::TcpListener)>, workers: Vec, - srv: mpsc::UnboundedSender, + srv: Server, ) -> Accept { // Create a poll instance let poll = match mio::Poll::new() { @@ -376,9 +371,7 @@ impl Accept { match self.workers[self.next].send(msg) { Ok(_) => (), Err(tmp) => { - let _ = self.srv.unbounded_send(ServerCommand::WorkerDied( - self.workers[self.next].idx, - )); + let _ = self.srv.worker_died(self.workers[self.next].idx); msg = tmp; self.workers.swap_remove(self.next); if self.workers.is_empty() { @@ -404,9 +397,7 @@ impl Accept { return; } Err(tmp) => { - let _ = self.srv.unbounded_send(ServerCommand::WorkerDied( - self.workers[self.next].idx, - )); + let _ = self.srv.worker_died(self.workers[self.next].idx); msg = tmp; self.workers.swap_remove(self.next); if self.workers.is_empty() { @@ -449,19 +440,14 @@ impl Accept { info.timeout = Some(Instant::now() + Duration::from_millis(500)); let r = self.timer.1.clone(); - System::current().arbiter().do_send(Execute::new( - move || -> Result<(), ()> { - Arbiter::spawn( - Delay::new(Instant::now() + Duration::from_millis(510)) - .map_err(|_| ()) - .and_then(move |_| { - let _ = r.set_readiness(mio::Ready::readable()); - Ok(()) - }), - ); - Ok(()) - }, - )); + System::current().arbiter().send(lazy(move || { + Delay::new(Instant::now() + Duration::from_millis(510)) + .map_err(|_| ()) + .and_then(move |_| { + let _ = r.set_readiness(mio::Ready::readable()); + Ok(()) + }) + })); return; } } diff --git a/src/server/builder.rs b/src/server/builder.rs index 4bd46c92..88516c5e 100644 --- a/src/server/builder.rs +++ b/src/server/builder.rs @@ -1,32 +1,27 @@ use std::time::Duration; use std::{io, mem, net}; -use actix_rt::Arbiter; -use futures::sync::{mpsc, mpsc::unbounded}; -use futures::{future::lazy, Future, Sink, Stream}; +use actix_rt::{spawn, Arbiter, System}; +use futures::future::{lazy, ok}; +use futures::stream::futures_unordered; +use futures::sync::mpsc::{unbounded, UnboundedReceiver}; +use futures::{Async, Future, Poll, Stream}; use log::{error, info}; use net2::TcpBuilder; use num_cpus; +use tokio_timer::sleep; -use actix::{ - actors::signal, fut, Actor, ActorFuture, Addr, AsyncContext, Context, Handler, Response, - StreamHandler, System, WrapFuture, -}; +// use actix::{actors::signal}; use super::accept::{AcceptLoop, AcceptNotify, Command}; use super::config::{ConfiguredService, ServiceConfig}; +use super::server::{Server, ServerCommand}; use super::services::{InternalServiceFactory, StreamNewService, StreamServiceFactory}; use super::services::{ServiceFactory, ServiceNewService}; use super::worker::{self, Worker, WorkerAvailability, WorkerClient}; -use super::{PauseServer, ResumeServer, StopServer, Token}; - -pub(crate) enum ServerCommand { - WorkerDied(usize), -} - -/// Server -pub struct Server {} +use super::Token; +/// Server builder pub struct ServerBuilder { threads: usize, token: Token, @@ -36,24 +31,35 @@ pub struct ServerBuilder { accept: AcceptLoop, exit: bool, shutdown_timeout: Duration, - signals: Option>, no_signals: bool, + cmd: UnboundedReceiver, + server: Server, +} + +impl Default for ServerBuilder { + fn default() -> Self { + Self::new() + } } impl ServerBuilder { - /// Create new Server instance - pub fn new() -> Server { + /// Create new Server builder instance + pub fn new() -> ServerBuilder { + let (tx, rx) = unbounded(); + let server = Server::new(tx); + ServerBuilder { threads: num_cpus::get(), token: Token(0), workers: Vec::new(), services: Vec::new(), sockets: Vec::new(), - accept: AcceptLoop::new(), + accept: AcceptLoop::new(server.clone()), exit: false, shutdown_timeout: Duration::from_secs(30), - signals: None, no_signals: false, + cmd: rx, + server, } } @@ -85,13 +91,6 @@ impl ServerBuilder { self } - #[doc(hidden)] - /// Set alternative address for `ProcessSignals` actor. - pub fn signals(mut self, addr: Addr) -> Self { - self.signals = Some(addr); - self - } - /// Disable signal handling pub fn disable_signals(mut self) -> Self { self.no_signals = true; @@ -115,7 +114,7 @@ impl ServerBuilder { /// /// This function is useful for moving parts of configuration to a /// different module or even library. - pub fn configure(mut self, f: F) -> io::Result + pub fn configure(mut self, f: F) -> io::Result where F: Fn(&mut ServiceConfig) -> io::Result<()>, { @@ -134,7 +133,7 @@ impl ServerBuilder { Ok(self) } - /// Add new service to server + /// Add new service to the server. pub fn bind>(mut self, name: N, addr: U, factory: F) -> io::Result where F: StreamServiceFactory, @@ -155,7 +154,7 @@ impl ServerBuilder { Ok(self) } - /// Add new service to server + /// Add new service to the server. pub fn listen>( mut self, name: N, @@ -175,7 +174,7 @@ impl ServerBuilder { self } - /// Add new service to server + /// Add new service to the server. pub fn listen2>( mut self, name: N, @@ -223,10 +222,10 @@ impl ServerBuilder { sys.run(); } - /// Starts Server Actor and returns its address + /// Starts processing incoming connections and return server controller. pub fn start(mut self) -> Server { if self.sockets.is_empty() { - panic!("Service should have at least one bound socket"); + panic!("Server should have at least one bound socket"); } else { info!("Starting {} workers", self.threads); @@ -242,33 +241,17 @@ impl ServerBuilder { for sock in &self.sockets { info!("Starting server on {}", sock.1.local_addr().ok().unwrap()); } - let rx = self - .accept + self.accept .start(mem::replace(&mut self.sockets, Vec::new()), workers); // start http server actor - let signals = self.subscribe_to_signals(); - let addr = Actor::create(move |ctx| { - ctx.add_stream(rx); - self - }); - if let Some(signals) = signals { - signals.do_send(signal::Subscribe(addr.clone().recipient())) - } - addr - } - } - - // subscribe to os signals - fn subscribe_to_signals(&self) -> Option> { - if !self.no_signals { - if let Some(ref signals) = self.signals { - Some(signals.clone()) - } else { - Some(System::current().registry().get::()) - } - } else { - None + // let signals = self.subscribe_to_signals(); + // if let Some(signals) = signals { + // signals.do_send(signal::Subscribe(addr.clone().recipient())) + // } + let server = self.server.clone(); + spawn(self); + server } } @@ -290,137 +273,122 @@ impl ServerBuilder { } } -impl Actor for Server { - type Context = Context; -} +// /// Signals support +// /// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system +// /// message to `System` actor. +// impl Handler for Server { +// type Result = (); -/// Signals support -/// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system -/// message to `System` actor. -impl Handler for Server { - type Result = (); +// fn handle(&mut self, msg: signal::Signal, ctx: &mut Context) { +// match msg.0 { +// signal::SignalType::Int => { +// info!("SIGINT received, exiting"); +// self.exit = true; +// Handler::::handle(self, StopServer { graceful: false }, ctx); +// } +// signal::SignalType::Term => { +// info!("SIGTERM received, stopping"); +// self.exit = true; +// Handler::::handle(self, StopServer { graceful: true }, ctx); +// } +// signal::SignalType::Quit => { +// info!("SIGQUIT received, exiting"); +// self.exit = true; +// Handler::::handle(self, StopServer { graceful: false }, ctx); +// } +// _ => (), +// } +// } +// } - fn handle(&mut self, msg: signal::Signal, ctx: &mut Context) { - match msg.0 { - signal::SignalType::Int => { - info!("SIGINT received, exiting"); - self.exit = true; - Handler::::handle(self, StopServer { graceful: false }, ctx); - } - signal::SignalType::Term => { - info!("SIGTERM received, stopping"); - self.exit = true; - Handler::::handle(self, StopServer { graceful: true }, ctx); - } - signal::SignalType::Quit => { - info!("SIGQUIT received, exiting"); - self.exit = true; - Handler::::handle(self, StopServer { graceful: false }, ctx); - } - _ => (), - } - } -} +impl Future for ServerBuilder { + type Item = (); + type Error = (); -impl Handler for Server { - type Result = (); + fn poll(&mut self) -> Poll { + loop { + match self.cmd.poll() { + Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(Some(item))) => match item { + ServerCommand::Pause(tx) => { + self.accept.send(Command::Pause); + let _ = tx.send(()); + } + ServerCommand::Resume(tx) => { + self.accept.send(Command::Resume); + let _ = tx.send(()); + } + ServerCommand::Stop { + graceful, + completion, + } => { + let exit = self.exit; - fn handle(&mut self, _: PauseServer, _: &mut Context) { - self.accept.send(Command::Pause); - } -} - -impl Handler for Server { - type Result = (); - - fn handle(&mut self, _: ResumeServer, _: &mut Context) { - self.accept.send(Command::Resume); - } -} - -impl Handler for Server { - type Result = Response<(), ()>; - - fn handle(&mut self, msg: StopServer, ctx: &mut Context) -> Self::Result { - // stop accept thread - self.accept.send(Command::Stop); - - // stop workers - let (tx, rx) = mpsc::channel(1); - - for worker in &self.workers { - let tx2 = tx.clone(); - ctx.spawn( - worker - .1 - .stop(msg.graceful) - .into_actor(self) - .then(move |_, slf, ctx| { - slf.workers.pop(); - if slf.workers.is_empty() { - let _ = tx2.send(()); + // stop accept thread + self.accept.send(Command::Stop); + // stop workers + if !self.workers.is_empty() { + spawn( + futures_unordered( + self.workers + .iter() + .map(move |worker| worker.1.stop(graceful)), + ) + .collect() + .then(move |_| { + let _ = completion.send(()); + if exit { + spawn(sleep(Duration::from_millis(300)).then(|_| { + System::current().stop(); + ok(()) + })); + } + ok(()) + }), + ) + } else { // we need to stop system if server was spawned - if slf.exit { - ctx.run_later(Duration::from_millis(300), |_, _| { + if self.exit { + spawn(sleep(Duration::from_millis(300)).then(|_| { System::current().stop(); - }); + ok(()) + })); } + let _ = completion.send(()); } - - fut::ok(()) - }), - ); - } - - if !self.workers.is_empty() { - Response::r#async(rx.into_future().map(|_| ()).map_err(|_| ())) - } else { - // we need to stop system if server was spawned - if self.exit { - ctx.run_later(Duration::from_millis(300), |_, _| { - System::current().stop(); - }); - } - Response::reply(Ok(())) - } - } -} - -/// Commands from accept threads -impl StreamHandler for Server { - fn finished(&mut self, _: &mut Context) {} - - fn handle(&mut self, msg: ServerCommand, _: &mut Context) { - match msg { - ServerCommand::WorkerDied(idx) => { - let mut found = false; - for i in 0..self.workers.len() { - if self.workers[i].0 == idx { - self.workers.swap_remove(i); - found = true; - break; } - } - - if found { - error!("Worker has died {:?}, restarting", idx); - - let mut new_idx = self.workers.len(); - 'found: loop { + ServerCommand::WorkerDied(idx) => { + let mut found = false; for i in 0..self.workers.len() { - if self.workers[i].0 == new_idx { - new_idx += 1; - continue 'found; + if self.workers[i].0 == idx { + self.workers.swap_remove(i); + found = true; + break; } } - break; - } - let worker = self.start_worker(new_idx, self.accept.get_notify()); - self.workers.push((new_idx, worker.clone())); - self.accept.send(Command::Worker(worker)); - } + if found { + error!("Worker has died {:?}, restarting", idx); + + let mut new_idx = self.workers.len(); + 'found: loop { + for i in 0..self.workers.len() { + if self.workers[i].0 == new_idx { + new_idx += 1; + continue 'found; + } + } + break; + } + + let worker = self.start_worker(new_idx, self.accept.get_notify()); + self.workers.push((new_idx, worker.clone())); + self.accept.send(Command::Worker(worker)); + } + } + }, } } } diff --git a/src/server/config.rs b/src/server/config.rs index 4409bcf7..85480978 100644 --- a/src/server/config.rs +++ b/src/server/config.rs @@ -8,7 +8,7 @@ use tokio_tcp::TcpStream; use crate::counter::CounterGuard; -use super::server::bind_addr; +use super::builder::bind_addr; use super::services::{ BoxedServerService, InternalServiceFactory, ServerMessage, StreamService, }; diff --git a/src/server/mod.rs b/src/server/mod.rs index d52db357..4ceb8ddf 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,40 +1,17 @@ -//! General purpose networking server - -use actix::Message; +//! General purpose tcp server mod accept; mod builder; mod config; +mod server; mod services; mod worker; pub use self::builder::ServerBuilder; pub use self::config::{ServiceConfig, ServiceRuntime}; +pub use self::server::Server; pub use self::services::{ServerMessage, ServiceFactory, StreamServiceFactory}; -/// Pause accepting incoming connections -/// -/// If socket contains some pending connection, they might be dropped. -/// All opened connection remains active. -#[derive(Message)] -pub struct PauseServer; - -/// Resume accepting incoming connections -#[derive(Message)] -pub struct ResumeServer; - -/// Stop incoming connection processing, stop all workers and exit. -/// -/// If server starts with `spawn()` method, then spawned thread get terminated. -pub struct StopServer { - /// Whether to try and shut down gracefully - pub graceful: bool, -} - -impl Message for StopServer { - type Result = Result<(), ()>; -} - /// Socket id token #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub(crate) struct Token(usize); diff --git a/src/server/server.rs b/src/server/server.rs new file mode 100644 index 00000000..cb1ce451 --- /dev/null +++ b/src/server/server.rs @@ -0,0 +1,63 @@ +use futures::sync::mpsc::UnboundedSender; +use futures::sync::oneshot; +use futures::Future; + +use super::builder::ServerBuilder; + +pub(crate) enum ServerCommand { + WorkerDied(usize), + Pause(oneshot::Sender<()>), + Resume(oneshot::Sender<()>), + /// Whether to try and shut down gracefully + Stop { + graceful: bool, + completion: oneshot::Sender<()>, + }, +} + +#[derive(Clone)] +pub struct Server(UnboundedSender); + +impl Server { + pub(crate) fn new(tx: UnboundedSender) -> Self { + Server(tx) + } + + /// Start server building process + pub fn build() -> ServerBuilder { + ServerBuilder::default() + } + + pub(crate) fn worker_died(&self, idx: usize) { + let _ = self.0.unbounded_send(ServerCommand::WorkerDied(idx)); + } + + /// Pause accepting incoming connections + /// + /// If socket contains some pending connection, they might be dropped. + /// All opened connection remains active. + pub fn pause(&self) -> impl Future { + let (tx, rx) = oneshot::channel(); + let _ = self.0.unbounded_send(ServerCommand::Pause(tx)); + rx.map_err(|_| ()) + } + + /// Resume accepting incoming connections + pub fn resume(&self) -> impl Future { + let (tx, rx) = oneshot::channel(); + let _ = self.0.unbounded_send(ServerCommand::Resume(tx)); + rx.map_err(|_| ()) + } + + /// Stop incoming connection processing, stop all workers and exit. + /// + /// If server starts with `spawn()` method, then spawned thread get terminated. + pub fn stop(&self, graceful: bool) -> impl Future { + let (tx, rx) = oneshot::channel(); + let _ = self.0.unbounded_send(ServerCommand::Stop { + graceful, + completion: tx, + }); + rx.map_err(|_| ()) + } +} diff --git a/src/server/services.rs b/src/server/services.rs index 562914a0..30261187 100644 --- a/src/server/services.rs +++ b/src/server/services.rs @@ -1,11 +1,11 @@ use std::net; use std::time::Duration; +use actix_rt::spawn; use actix_service::{NewService, Service}; use futures::future::{err, ok, FutureResult}; use futures::{Future, Poll}; use log::error; -use tokio_current_thread::spawn; use tokio_reactor::Handle; use tokio_tcp::TcpStream; diff --git a/src/stream.rs b/src/stream.rs index 618575a6..46519109 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,9 +1,9 @@ use std::marker::PhantomData; +use actix_rt::spawn; use actix_service::{IntoService, NewService, Service}; use futures::unsync::mpsc; use futures::{future, Async, Future, Poll, Stream}; -use tokio_current_thread::spawn; pub struct StreamDispatcher { stream: S, diff --git a/src/time.rs b/src/time.rs index 27e1d3a8..9679ae22 100644 --- a/src/time.rs +++ b/src/time.rs @@ -1,9 +1,9 @@ use std::time::{Duration, Instant}; +use actix_rt::spawn; use actix_service::{NewService, Service}; use futures::future::{ok, FutureResult}; use futures::{Async, Future, Poll}; -use tokio_current_thread::spawn; use tokio_timer::sleep; use super::cell::Cell;