1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-08-15 20:23:53 +02:00

Compare commits

..

4 Commits

Author SHA1 Message Date
Nikolay Kim
9fa2a36b4e prepare actix-rt release 2019-11-14 17:33:28 +06:00
Ivan Ladelshchikov
ed5023128b store the thread's handle with arbiter (#60) 2019-11-14 15:07:33 +06:00
Nikolay Kim
2e8c2c7733 Re-register task on every future poll 2019-10-14 17:55:52 +06:00
Nikolay Kim
115e82329f fix arbiter thread panic message 2019-10-14 11:19:08 +06:00
10 changed files with 70 additions and 17 deletions

View File

@@ -1,5 +1,11 @@
# Changes # Changes
## [0.1.1] - 2019-10-14
* Re-register task on every dispatcher poll.
## [0.1.0] - 2019-09-25 ## [0.1.0] - 2019-09-25
* Initial release * Initial release

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "actix-ioframe" name = "actix-ioframe"
version = "0.1.0" version = "0.1.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix framed service" description = "Actix framed service"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View File

@@ -29,6 +29,10 @@ impl<T> Cell<T> {
} }
} }
pub(crate) unsafe fn get_ref(&mut self) -> &T {
&*self.inner.as_ref().get()
}
pub(crate) unsafe fn get_mut(&mut self) -> &mut T { pub(crate) unsafe fn get_mut(&mut self) -> &mut T {
&mut *self.inner.as_ref().get() &mut *self.inner.as_ref().get()
} }

View File

@@ -154,7 +154,6 @@ where
}; };
let mut cell = self.inner.clone(); let mut cell = self.inner.clone();
unsafe { cell.get_mut().task.register() };
tokio_current_thread::spawn( tokio_current_thread::spawn(
self.service self.service
.call(Item::new(self.state.clone(), self.sink.clone(), item)) .call(Item::new(self.state.clone(), self.sink.clone(), item))
@@ -275,6 +274,8 @@ where
type Error = ServiceError<S::Error, U>; type Error = ServiceError<S::Error, U>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
unsafe { self.inner.get_ref().task.register() };
match mem::replace(&mut self.dispatch_state, FramedState::Processing) { match mem::replace(&mut self.dispatch_state, FramedState::Processing) {
FramedState::Processing => { FramedState::Processing => {
if self.poll_read() || self.poll_write() { if self.poll_read() || self.poll_write() {

View File

@@ -1,5 +1,16 @@
# Changes # Changes
## [0.2.6] - 2019-11-14
### Fixed
* Fix arbiter's thread panic message.
### Added
* Allow to join arbiter's thread. #60
## [0.2.5] - 2019-09-02 ## [0.2.5] - 2019-09-02
### Added ### Added

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "actix-rt" name = "actix-rt"
version = "0.2.5" version = "0.2.6"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix runtime" description = "Actix runtime"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View File

@@ -39,11 +39,20 @@ impl fmt::Debug for ArbiterCommand {
} }
} }
#[derive(Debug, Clone)] #[derive(Debug)]
/// Arbiters provide an asynchronous execution environment for actors, functions /// Arbiters provide an asynchronous execution environment for actors, functions
/// and futures. When an Arbiter is created, they spawn a new OS thread, and /// and futures. When an Arbiter is created, they spawn a new OS thread, and
/// host an event loop. Some Arbiter functions execute on the current thread. /// host an event loop. Some Arbiter functions execute on the current thread.
pub struct Arbiter(UnboundedSender<ArbiterCommand>); pub struct Arbiter {
sender: UnboundedSender<ArbiterCommand>,
thread_handle: Option<thread::JoinHandle<()>>,
}
impl Clone for Arbiter {
fn clone(&self) -> Self {
Self::with_sender(self.sender.clone())
}
}
impl Default for Arbiter { impl Default for Arbiter {
fn default() -> Self { fn default() -> Self {
@@ -55,7 +64,7 @@ impl Arbiter {
pub(crate) fn new_system() -> Self { pub(crate) fn new_system() -> Self {
let (tx, rx) = unbounded(); let (tx, rx) = unbounded();
let arb = Arbiter(tx); let arb = Arbiter::with_sender(tx);
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
RUNNING.with(|cell| cell.set(false)); RUNNING.with(|cell| cell.set(false));
STORAGE.with(|cell| cell.borrow_mut().clear()); STORAGE.with(|cell| cell.borrow_mut().clear());
@@ -75,7 +84,7 @@ impl Arbiter {
/// Stop arbiter from continuing it's event loop. /// Stop arbiter from continuing it's event loop.
pub fn stop(&self) { pub fn stop(&self) {
let _ = self.0.unbounded_send(ArbiterCommand::Stop); let _ = self.sender.unbounded_send(ArbiterCommand::Stop);
} }
/// Spawn new thread and run event loop in spawned thread. /// Spawn new thread and run event loop in spawned thread.
@@ -87,9 +96,9 @@ impl Arbiter {
let (arb_tx, arb_rx) = unbounded(); let (arb_tx, arb_rx) = unbounded();
let arb_tx2 = arb_tx.clone(); let arb_tx2 = arb_tx.clone();
let _ = thread::Builder::new().name(name.clone()).spawn(move || { let handle = thread::Builder::new().name(name.clone()).spawn(move || {
let mut rt = Builder::new().build_rt().expect("Can not create Runtime"); let mut rt = Builder::new().build_rt().expect("Can not create Runtime");
let arb = Arbiter(arb_tx); let arb = Arbiter::with_sender(arb_tx);
let (stop, stop_rx) = channel(); let (stop, stop_rx) = channel();
RUNNING.with(|cell| cell.set(true)); RUNNING.with(|cell| cell.set(true));
@@ -119,9 +128,9 @@ impl Arbiter {
let _ = System::current() let _ = System::current()
.sys() .sys()
.unbounded_send(SystemCommand::UnregisterArbiter(id)); .unbounded_send(SystemCommand::UnregisterArbiter(id));
}); }).unwrap_or_else(|err| panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err));
Arbiter(arb_tx2) Arbiter{sender: arb_tx2, thread_handle: Some(handle)}
} }
pub(crate) fn run_system() { pub(crate) fn run_system() {
@@ -171,7 +180,7 @@ impl Arbiter {
F: Future<Item = (), Error = ()> + Send + 'static, F: Future<Item = (), Error = ()> + Send + 'static,
{ {
let _ = self let _ = self
.0 .sender
.unbounded_send(ArbiterCommand::Execute(Box::new(future))); .unbounded_send(ArbiterCommand::Execute(Box::new(future)));
} }
@@ -182,7 +191,7 @@ impl Arbiter {
F: FnOnce() + Send + 'static, F: FnOnce() + Send + 'static,
{ {
let _ = self let _ = self
.0 .sender
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || { .unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
f(); f();
}))); })));
@@ -198,7 +207,7 @@ impl Arbiter {
{ {
let (tx, rx) = channel(); let (tx, rx) = channel();
let _ = self let _ = self
.0 .sender
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || { .unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
if !tx.is_canceled() { if !tx.is_canceled() {
let _ = tx.send(f()); let _ = tx.send(f());
@@ -250,6 +259,20 @@ impl Arbiter {
f(item) f(item)
}) })
} }
fn with_sender(sender: UnboundedSender<ArbiterCommand>) -> Self {
Self{sender, thread_handle: None}
}
/// Wait for the event loop to stop by joining the underlying thread (if have Some).
pub fn join(&mut self) -> thread::Result<()>{
if let Some(thread_handle) = self.thread_handle.take() {
thread_handle.join()
}
else {
Ok(())
}
}
} }
struct ArbiterController { struct ArbiterController {
@@ -260,9 +283,11 @@ struct ArbiterController {
impl Drop for ArbiterController { impl Drop for ArbiterController {
fn drop(&mut self) { fn drop(&mut self) {
if thread::panicking() { if thread::panicking() {
eprintln!("Panic in Arbiter thread, shutting down system.");
if System::current().stop_on_panic() { if System::current().stop_on_panic() {
eprintln!("Panic in Arbiter thread, shutting down system.");
System::current().stop_with_code(1) System::current().stop_with_code(1)
} else {
eprintln!("Panic in Arbiter thread.");
} }
} }
} }

View File

@@ -1,5 +1,10 @@
# Changes # Changes
## [0.4.7] - 2019-10-14
* Re-register task on every framed transport poll.
## [0.4.6] - 2019-10-08 ## [0.4.6] - 2019-10-08
* Refactor `Counter` type. register current task in available method. * Refactor `Counter` type. register current task in available method.

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "actix-utils" name = "actix-utils"
version = "0.4.6" version = "0.4.7"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix utils - various actix net related services" description = "Actix utils - various actix net related services"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View File

@@ -129,7 +129,6 @@ where
}; };
let mut cell = self.inner.clone(); let mut cell = self.inner.clone();
cell.get_mut().task.register();
tokio_current_thread::spawn(self.service.call(item).then(move |item| { tokio_current_thread::spawn(self.service.call(item).then(move |item| {
let inner = cell.get_mut(); let inner = cell.get_mut();
inner.buf.push_back(item); inner.buf.push_back(item);
@@ -293,6 +292,8 @@ where
type Error = FramedTransportError<S::Error, U>; type Error = FramedTransportError<S::Error, U>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.get_ref().task.register();
match mem::replace(&mut self.state, TransportState::Processing) { match mem::replace(&mut self.state, TransportState::Processing) {
TransportState::Processing => { TransportState::Processing => {
if self.poll_read() || self.poll_write() { if self.poll_read() || self.poll_write() {