From cdd6904aa0f6ae05e442c6e274e32ccfb1487f31 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 9 Dec 2018 20:30:04 -0800 Subject: [PATCH] rename Server to ServerBuilder --- Cargo.toml | 1 + actix-rt/src/arbiter.rs | 60 +++++++++++++++++++++------- actix-rt/src/lib.rs | 19 ++++++++- actix-rt/src/system.rs | 3 +- src/server/{server.rs => builder.rs} | 25 +++++------- src/server/mod.rs | 4 +- src/server/worker.rs | 13 +++--- 7 files changed, 83 insertions(+), 42 deletions(-) rename src/server/{server.rs => builder.rs} (96%) diff --git a/Cargo.toml b/Cargo.toml index 686103f9..fe733456 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ cell = [] actix = "0.7.6" actix-service = "0.1.1" actix-codec = { path = "actix-codec" } +actix-rt = { path = "actix-rt" } log = "0.4" num_cpus = "1.0" diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index e7492578..56fffc88 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -5,7 +5,7 @@ use std::cell::{Cell, RefCell}; use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::thread; +use std::{fmt, thread}; use futures::sync::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures::sync::oneshot::{channel, Sender}; @@ -23,9 +23,18 @@ thread_local!( pub(crate) const COUNT: AtomicUsize = AtomicUsize::new(0); -#[derive(Debug)] pub(crate) enum ArbiterCommand { Stop, + Execute(Box + Send>), +} + +impl fmt::Debug for ArbiterCommand { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + ArbiterCommand::Stop => write!(f, "ArbiterCommand::Stop"), + ArbiterCommand::Execute(_) => write!(f, "ArbiterCommand::Execute"), + } + } } #[derive(Debug, Clone)] @@ -49,6 +58,14 @@ impl Arbiter { arb } + /// Returns current arbiter's address + pub fn current() -> Arbiter { + ADDR.with(|cell| match *cell.borrow() { + Some(ref addr) => addr.clone(), + None => panic!("Arbiter is not running"), + }) + } + /// Stop arbiter pub fn stop(&self) { let _ = self.0.unbounded_send(ArbiterCommand::Stop); @@ -113,7 +130,7 @@ impl Arbiter { RUNNING.with(|cell| cell.set(false)); } - /// Executes a future on the current thread. + /// Spawn a future on the current thread. pub fn spawn(future: F) where F: Future + 'static, @@ -135,6 +152,16 @@ impl Arbiter { { Arbiter::spawn(future::lazy(f)) } + + /// Send a future on the arbiter's thread and spawn. + pub fn send(&self, future: F) + where + F: Future + Send + 'static, + { + let _ = self + .0 + .unbounded_send(ArbiterCommand::Execute(Box::new(future))); + } } struct ArbiterController { @@ -158,17 +185,22 @@ impl Future for ArbiterController { type Error = (); fn poll(&mut self) -> Poll { - match self.rx.poll() { - Ok(Async::Ready(None)) | Err(_) => Ok(Async::Ready(())), - Ok(Async::Ready(Some(item))) => match item { - ArbiterCommand::Stop => { - if let Some(stop) = self.stop.take() { - let _ = stop.send(0); - }; - Ok(Async::Ready(())) - } - }, - Ok(Async::NotReady) => Ok(Async::NotReady), + loop { + match self.rx.poll() { + Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), + Ok(Async::Ready(Some(item))) => match item { + ArbiterCommand::Stop => { + if let Some(stop) = self.stop.take() { + let _ = stop.send(0); + }; + return Ok(Async::Ready(())); + } + ArbiterCommand::Execute(fut) => { + spawn(fut); + } + }, + Ok(Async::NotReady) => return Ok(Async::NotReady), + } } } } diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index ef390b97..c2db6caf 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -5,8 +5,23 @@ mod builder; mod runtime; mod system; +pub use self::arbiter::Arbiter; pub use self::builder::{Builder, SystemRunner}; pub use self::runtime::{Handle, Runtime}; pub use self::system::System; -// pub use tokio_current_thread::spawn; -// pub use tokio_current_thread::TaskExecutor; + +/// Spawns a future on the current arbiter. +/// +/// # Panics +/// +/// This function panics if actix system is not running. +pub fn spawn(f: F) +where + F: futures::Future + 'static, +{ + if !System::is_set() { + panic!("System is not running"); + } + + Arbiter::spawn(f); +} diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index a1a4e2f2..d9e093db 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -58,8 +58,7 @@ impl System { } /// Set current running system. - #[doc(hidden)] - pub(crate) fn _is_set() -> bool { + pub(crate) fn is_set() -> bool { CURRENT.with(|cell| cell.borrow().is_some()) } diff --git a/src/server/server.rs b/src/server/builder.rs similarity index 96% rename from src/server/server.rs rename to src/server/builder.rs index 34605362..4bd46c92 100644 --- a/src/server/server.rs +++ b/src/server/builder.rs @@ -1,15 +1,16 @@ use std::time::Duration; use std::{io, mem, net}; +use actix_rt::Arbiter; use futures::sync::{mpsc, mpsc::unbounded}; -use futures::{Future, Sink, Stream}; +use futures::{future::lazy, Future, Sink, Stream}; use log::{error, info}; use net2::TcpBuilder; use num_cpus; use actix::{ - actors::signal, fut, msgs::Execute, Actor, ActorFuture, Addr, Arbiter, AsyncContext, - Context, Handler, Response, StreamHandler, System, WrapFuture, + actors::signal, fut, Actor, ActorFuture, Addr, AsyncContext, Context, Handler, Response, + StreamHandler, System, WrapFuture, }; use super::accept::{AcceptLoop, AcceptNotify, Command}; @@ -24,7 +25,9 @@ pub(crate) enum ServerCommand { } /// Server -pub struct Server { +pub struct Server {} + +pub struct ServerBuilder { threads: usize, token: Token, workers: Vec<(usize, WorkerClient)>, @@ -37,16 +40,10 @@ pub struct Server { no_signals: bool, } -impl Default for Server { - fn default() -> Self { - Self::new() - } -} - -impl Server { +impl ServerBuilder { /// Create new Server instance pub fn new() -> Server { - Server { + ServerBuilder { threads: num_cpus::get(), token: Token(0), workers: Vec::new(), @@ -227,7 +224,7 @@ impl Server { } /// Starts Server Actor and returns its address - pub fn start(mut self) -> Addr { + pub fn start(mut self) -> Server { if self.sockets.is_empty() { panic!("Service should have at least one bound socket"); } else { @@ -284,7 +281,7 @@ impl Server { let services: Vec> = self.services.iter().map(|v| v.clone_factory()).collect(); - Arbiter::new(format!("actix-net-worker-{}", idx)).do_send(Execute::new(move || { + Arbiter::new().send(lazy(move || { Worker::start(rx1, rx2, services, avail, timeout); Ok::<_, ()>(()) })); diff --git a/src/server/mod.rs b/src/server/mod.rs index 24d20b15..d52db357 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -3,13 +3,13 @@ use actix::Message; mod accept; +mod builder; mod config; -mod server; mod services; mod worker; +pub use self::builder::ServerBuilder; pub use self::config::{ServiceConfig, ServiceRuntime}; -pub use self::server::Server; pub use self::services::{ServerMessage, ServiceFactory, StreamServiceFactory}; /// Pause accepting incoming connections diff --git a/src/server/worker.rs b/src/server/worker.rs index 37682a8b..0a944c7b 100644 --- a/src/server/worker.rs +++ b/src/server/worker.rs @@ -2,16 +2,13 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use std::{mem, net, time}; +use actix_rt::{spawn, Arbiter}; use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use futures::sync::oneshot; use futures::{future, Async, Future, Poll, Stream}; use log::{error, info, trace}; -use tokio_current_thread::spawn; use tokio_timer::{sleep, Delay}; -use actix::msgs::StopArbiter; -use actix::{Arbiter, Message}; - use super::accept::AcceptNotify; use super::services::{BoxedServerService, InternalServiceFactory, ServerMessage}; use super::Token; @@ -26,7 +23,7 @@ pub(crate) struct StopCommand { result: oneshot::Sender, } -#[derive(Debug, Message)] +#[derive(Debug)] pub(crate) struct Conn { pub io: net::TcpStream, pub token: Token, @@ -167,7 +164,7 @@ impl Worker { future::join_all(fut) .map_err(|e| { error!("Can not start worker: {:?}", e); - Arbiter::current().do_send(StopArbiter(0)); + Arbiter::current().stop(); }) .and_then(move |services| { for item in services { @@ -365,7 +362,7 @@ impl Future for Worker { let num = num_connections(); if num == 0 { let _ = tx.send(true); - Arbiter::current().do_send(StopArbiter(0)); + Arbiter::current().stop(); return Ok(Async::Ready(())); } @@ -375,7 +372,7 @@ impl Future for Worker { Async::Ready(_) => { self.shutdown(true); let _ = tx.send(false); - Arbiter::current().do_send(StopArbiter(0)); + Arbiter::current().stop(); return Ok(Async::Ready(())); } }