mirror of
https://github.com/fafhrd91/actix-net
synced 2025-01-19 02:41:49 +01:00
store the thread's handle with arbiter (#60)
This commit is contained in:
parent
2e8c2c7733
commit
ed5023128b
@ -6,6 +6,10 @@
|
|||||||
|
|
||||||
* Fix arbiter's thread panic message.
|
* Fix arbiter's thread panic message.
|
||||||
|
|
||||||
|
### Added
|
||||||
|
|
||||||
|
* Allow to join arbiter's thread. #60
|
||||||
|
|
||||||
|
|
||||||
## [0.2.5] - 2019-09-02
|
## [0.2.5] - 2019-09-02
|
||||||
|
|
||||||
|
@ -39,11 +39,20 @@ impl fmt::Debug for ArbiterCommand {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug)]
|
||||||
/// Arbiters provide an asynchronous execution environment for actors, functions
|
/// Arbiters provide an asynchronous execution environment for actors, functions
|
||||||
/// and futures. When an Arbiter is created, they spawn a new OS thread, and
|
/// and futures. When an Arbiter is created, they spawn a new OS thread, and
|
||||||
/// host an event loop. Some Arbiter functions execute on the current thread.
|
/// host an event loop. Some Arbiter functions execute on the current thread.
|
||||||
pub struct Arbiter(UnboundedSender<ArbiterCommand>);
|
pub struct Arbiter {
|
||||||
|
sender: UnboundedSender<ArbiterCommand>,
|
||||||
|
thread_handle: Option<thread::JoinHandle<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Clone for Arbiter {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self::with_sender(self.sender.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Default for Arbiter {
|
impl Default for Arbiter {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
@ -55,7 +64,7 @@ impl Arbiter {
|
|||||||
pub(crate) fn new_system() -> Self {
|
pub(crate) fn new_system() -> Self {
|
||||||
let (tx, rx) = unbounded();
|
let (tx, rx) = unbounded();
|
||||||
|
|
||||||
let arb = Arbiter(tx);
|
let arb = Arbiter::with_sender(tx);
|
||||||
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
|
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
|
||||||
RUNNING.with(|cell| cell.set(false));
|
RUNNING.with(|cell| cell.set(false));
|
||||||
STORAGE.with(|cell| cell.borrow_mut().clear());
|
STORAGE.with(|cell| cell.borrow_mut().clear());
|
||||||
@ -75,7 +84,7 @@ impl Arbiter {
|
|||||||
|
|
||||||
/// Stop arbiter from continuing it's event loop.
|
/// Stop arbiter from continuing it's event loop.
|
||||||
pub fn stop(&self) {
|
pub fn stop(&self) {
|
||||||
let _ = self.0.unbounded_send(ArbiterCommand::Stop);
|
let _ = self.sender.unbounded_send(ArbiterCommand::Stop);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Spawn new thread and run event loop in spawned thread.
|
/// Spawn new thread and run event loop in spawned thread.
|
||||||
@ -87,9 +96,9 @@ impl Arbiter {
|
|||||||
let (arb_tx, arb_rx) = unbounded();
|
let (arb_tx, arb_rx) = unbounded();
|
||||||
let arb_tx2 = arb_tx.clone();
|
let arb_tx2 = arb_tx.clone();
|
||||||
|
|
||||||
let _ = thread::Builder::new().name(name.clone()).spawn(move || {
|
let handle = thread::Builder::new().name(name.clone()).spawn(move || {
|
||||||
let mut rt = Builder::new().build_rt().expect("Can not create Runtime");
|
let mut rt = Builder::new().build_rt().expect("Can not create Runtime");
|
||||||
let arb = Arbiter(arb_tx);
|
let arb = Arbiter::with_sender(arb_tx);
|
||||||
|
|
||||||
let (stop, stop_rx) = channel();
|
let (stop, stop_rx) = channel();
|
||||||
RUNNING.with(|cell| cell.set(true));
|
RUNNING.with(|cell| cell.set(true));
|
||||||
@ -119,9 +128,9 @@ impl Arbiter {
|
|||||||
let _ = System::current()
|
let _ = System::current()
|
||||||
.sys()
|
.sys()
|
||||||
.unbounded_send(SystemCommand::UnregisterArbiter(id));
|
.unbounded_send(SystemCommand::UnregisterArbiter(id));
|
||||||
});
|
}).unwrap_or_else(|err| panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err));
|
||||||
|
|
||||||
Arbiter(arb_tx2)
|
Arbiter{sender: arb_tx2, thread_handle: Some(handle)}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn run_system() {
|
pub(crate) fn run_system() {
|
||||||
@ -171,7 +180,7 @@ impl Arbiter {
|
|||||||
F: Future<Item = (), Error = ()> + Send + 'static,
|
F: Future<Item = (), Error = ()> + Send + 'static,
|
||||||
{
|
{
|
||||||
let _ = self
|
let _ = self
|
||||||
.0
|
.sender
|
||||||
.unbounded_send(ArbiterCommand::Execute(Box::new(future)));
|
.unbounded_send(ArbiterCommand::Execute(Box::new(future)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -182,7 +191,7 @@ impl Arbiter {
|
|||||||
F: FnOnce() + Send + 'static,
|
F: FnOnce() + Send + 'static,
|
||||||
{
|
{
|
||||||
let _ = self
|
let _ = self
|
||||||
.0
|
.sender
|
||||||
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
|
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
|
||||||
f();
|
f();
|
||||||
})));
|
})));
|
||||||
@ -198,7 +207,7 @@ impl Arbiter {
|
|||||||
{
|
{
|
||||||
let (tx, rx) = channel();
|
let (tx, rx) = channel();
|
||||||
let _ = self
|
let _ = self
|
||||||
.0
|
.sender
|
||||||
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
|
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
|
||||||
if !tx.is_canceled() {
|
if !tx.is_canceled() {
|
||||||
let _ = tx.send(f());
|
let _ = tx.send(f());
|
||||||
@ -250,6 +259,20 @@ impl Arbiter {
|
|||||||
f(item)
|
f(item)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn with_sender(sender: UnboundedSender<ArbiterCommand>) -> Self {
|
||||||
|
Self{sender, thread_handle: None}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wait for the event loop to stop by joining the underlying thread (if have Some).
|
||||||
|
pub fn join(&mut self) -> thread::Result<()>{
|
||||||
|
if let Some(thread_handle) = self.thread_handle.take() {
|
||||||
|
thread_handle.join()
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ArbiterController {
|
struct ArbiterController {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user