mirror of
https://github.com/fafhrd91/actix-net
synced 2025-08-15 20:23:53 +02:00
Compare commits
4 Commits
testing-v0
...
rt-0.2.6
Author | SHA1 | Date | |
---|---|---|---|
|
9fa2a36b4e | ||
|
ed5023128b | ||
|
2e8c2c7733 | ||
|
115e82329f |
@@ -1,5 +1,11 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
|
||||||
|
## [0.1.1] - 2019-10-14
|
||||||
|
|
||||||
|
* Re-register task on every dispatcher poll.
|
||||||
|
|
||||||
|
|
||||||
## [0.1.0] - 2019-09-25
|
## [0.1.0] - 2019-09-25
|
||||||
|
|
||||||
* Initial release
|
* Initial release
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-ioframe"
|
name = "actix-ioframe"
|
||||||
version = "0.1.0"
|
version = "0.1.1"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Actix framed service"
|
description = "Actix framed service"
|
||||||
keywords = ["network", "framework", "async", "futures"]
|
keywords = ["network", "framework", "async", "futures"]
|
||||||
|
@@ -29,6 +29,10 @@ impl<T> Cell<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) unsafe fn get_ref(&mut self) -> &T {
|
||||||
|
&*self.inner.as_ref().get()
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) unsafe fn get_mut(&mut self) -> &mut T {
|
pub(crate) unsafe fn get_mut(&mut self) -> &mut T {
|
||||||
&mut *self.inner.as_ref().get()
|
&mut *self.inner.as_ref().get()
|
||||||
}
|
}
|
||||||
|
@@ -154,7 +154,6 @@ where
|
|||||||
};
|
};
|
||||||
|
|
||||||
let mut cell = self.inner.clone();
|
let mut cell = self.inner.clone();
|
||||||
unsafe { cell.get_mut().task.register() };
|
|
||||||
tokio_current_thread::spawn(
|
tokio_current_thread::spawn(
|
||||||
self.service
|
self.service
|
||||||
.call(Item::new(self.state.clone(), self.sink.clone(), item))
|
.call(Item::new(self.state.clone(), self.sink.clone(), item))
|
||||||
@@ -275,6 +274,8 @@ where
|
|||||||
type Error = ServiceError<S::Error, U>;
|
type Error = ServiceError<S::Error, U>;
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
unsafe { self.inner.get_ref().task.register() };
|
||||||
|
|
||||||
match mem::replace(&mut self.dispatch_state, FramedState::Processing) {
|
match mem::replace(&mut self.dispatch_state, FramedState::Processing) {
|
||||||
FramedState::Processing => {
|
FramedState::Processing => {
|
||||||
if self.poll_read() || self.poll_write() {
|
if self.poll_read() || self.poll_write() {
|
||||||
|
@@ -1,5 +1,16 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [0.2.6] - 2019-11-14
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
* Fix arbiter's thread panic message.
|
||||||
|
|
||||||
|
### Added
|
||||||
|
|
||||||
|
* Allow to join arbiter's thread. #60
|
||||||
|
|
||||||
|
|
||||||
## [0.2.5] - 2019-09-02
|
## [0.2.5] - 2019-09-02
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-rt"
|
name = "actix-rt"
|
||||||
version = "0.2.5"
|
version = "0.2.6"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Actix runtime"
|
description = "Actix runtime"
|
||||||
keywords = ["network", "framework", "async", "futures"]
|
keywords = ["network", "framework", "async", "futures"]
|
||||||
|
@@ -39,11 +39,20 @@ impl fmt::Debug for ArbiterCommand {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug)]
|
||||||
/// Arbiters provide an asynchronous execution environment for actors, functions
|
/// Arbiters provide an asynchronous execution environment for actors, functions
|
||||||
/// and futures. When an Arbiter is created, they spawn a new OS thread, and
|
/// and futures. When an Arbiter is created, they spawn a new OS thread, and
|
||||||
/// host an event loop. Some Arbiter functions execute on the current thread.
|
/// host an event loop. Some Arbiter functions execute on the current thread.
|
||||||
pub struct Arbiter(UnboundedSender<ArbiterCommand>);
|
pub struct Arbiter {
|
||||||
|
sender: UnboundedSender<ArbiterCommand>,
|
||||||
|
thread_handle: Option<thread::JoinHandle<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Clone for Arbiter {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self::with_sender(self.sender.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Default for Arbiter {
|
impl Default for Arbiter {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
@@ -55,7 +64,7 @@ impl Arbiter {
|
|||||||
pub(crate) fn new_system() -> Self {
|
pub(crate) fn new_system() -> Self {
|
||||||
let (tx, rx) = unbounded();
|
let (tx, rx) = unbounded();
|
||||||
|
|
||||||
let arb = Arbiter(tx);
|
let arb = Arbiter::with_sender(tx);
|
||||||
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
|
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
|
||||||
RUNNING.with(|cell| cell.set(false));
|
RUNNING.with(|cell| cell.set(false));
|
||||||
STORAGE.with(|cell| cell.borrow_mut().clear());
|
STORAGE.with(|cell| cell.borrow_mut().clear());
|
||||||
@@ -75,7 +84,7 @@ impl Arbiter {
|
|||||||
|
|
||||||
/// Stop arbiter from continuing it's event loop.
|
/// Stop arbiter from continuing it's event loop.
|
||||||
pub fn stop(&self) {
|
pub fn stop(&self) {
|
||||||
let _ = self.0.unbounded_send(ArbiterCommand::Stop);
|
let _ = self.sender.unbounded_send(ArbiterCommand::Stop);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Spawn new thread and run event loop in spawned thread.
|
/// Spawn new thread and run event loop in spawned thread.
|
||||||
@@ -87,9 +96,9 @@ impl Arbiter {
|
|||||||
let (arb_tx, arb_rx) = unbounded();
|
let (arb_tx, arb_rx) = unbounded();
|
||||||
let arb_tx2 = arb_tx.clone();
|
let arb_tx2 = arb_tx.clone();
|
||||||
|
|
||||||
let _ = thread::Builder::new().name(name.clone()).spawn(move || {
|
let handle = thread::Builder::new().name(name.clone()).spawn(move || {
|
||||||
let mut rt = Builder::new().build_rt().expect("Can not create Runtime");
|
let mut rt = Builder::new().build_rt().expect("Can not create Runtime");
|
||||||
let arb = Arbiter(arb_tx);
|
let arb = Arbiter::with_sender(arb_tx);
|
||||||
|
|
||||||
let (stop, stop_rx) = channel();
|
let (stop, stop_rx) = channel();
|
||||||
RUNNING.with(|cell| cell.set(true));
|
RUNNING.with(|cell| cell.set(true));
|
||||||
@@ -119,9 +128,9 @@ impl Arbiter {
|
|||||||
let _ = System::current()
|
let _ = System::current()
|
||||||
.sys()
|
.sys()
|
||||||
.unbounded_send(SystemCommand::UnregisterArbiter(id));
|
.unbounded_send(SystemCommand::UnregisterArbiter(id));
|
||||||
});
|
}).unwrap_or_else(|err| panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err));
|
||||||
|
|
||||||
Arbiter(arb_tx2)
|
Arbiter{sender: arb_tx2, thread_handle: Some(handle)}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn run_system() {
|
pub(crate) fn run_system() {
|
||||||
@@ -171,7 +180,7 @@ impl Arbiter {
|
|||||||
F: Future<Item = (), Error = ()> + Send + 'static,
|
F: Future<Item = (), Error = ()> + Send + 'static,
|
||||||
{
|
{
|
||||||
let _ = self
|
let _ = self
|
||||||
.0
|
.sender
|
||||||
.unbounded_send(ArbiterCommand::Execute(Box::new(future)));
|
.unbounded_send(ArbiterCommand::Execute(Box::new(future)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -182,7 +191,7 @@ impl Arbiter {
|
|||||||
F: FnOnce() + Send + 'static,
|
F: FnOnce() + Send + 'static,
|
||||||
{
|
{
|
||||||
let _ = self
|
let _ = self
|
||||||
.0
|
.sender
|
||||||
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
|
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
|
||||||
f();
|
f();
|
||||||
})));
|
})));
|
||||||
@@ -198,7 +207,7 @@ impl Arbiter {
|
|||||||
{
|
{
|
||||||
let (tx, rx) = channel();
|
let (tx, rx) = channel();
|
||||||
let _ = self
|
let _ = self
|
||||||
.0
|
.sender
|
||||||
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
|
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
|
||||||
if !tx.is_canceled() {
|
if !tx.is_canceled() {
|
||||||
let _ = tx.send(f());
|
let _ = tx.send(f());
|
||||||
@@ -250,6 +259,20 @@ impl Arbiter {
|
|||||||
f(item)
|
f(item)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn with_sender(sender: UnboundedSender<ArbiterCommand>) -> Self {
|
||||||
|
Self{sender, thread_handle: None}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wait for the event loop to stop by joining the underlying thread (if have Some).
|
||||||
|
pub fn join(&mut self) -> thread::Result<()>{
|
||||||
|
if let Some(thread_handle) = self.thread_handle.take() {
|
||||||
|
thread_handle.join()
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ArbiterController {
|
struct ArbiterController {
|
||||||
@@ -260,9 +283,11 @@ struct ArbiterController {
|
|||||||
impl Drop for ArbiterController {
|
impl Drop for ArbiterController {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if thread::panicking() {
|
if thread::panicking() {
|
||||||
eprintln!("Panic in Arbiter thread, shutting down system.");
|
|
||||||
if System::current().stop_on_panic() {
|
if System::current().stop_on_panic() {
|
||||||
|
eprintln!("Panic in Arbiter thread, shutting down system.");
|
||||||
System::current().stop_with_code(1)
|
System::current().stop_with_code(1)
|
||||||
|
} else {
|
||||||
|
eprintln!("Panic in Arbiter thread.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -1,5 +1,10 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [0.4.7] - 2019-10-14
|
||||||
|
|
||||||
|
* Re-register task on every framed transport poll.
|
||||||
|
|
||||||
|
|
||||||
## [0.4.6] - 2019-10-08
|
## [0.4.6] - 2019-10-08
|
||||||
|
|
||||||
* Refactor `Counter` type. register current task in available method.
|
* Refactor `Counter` type. register current task in available method.
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-utils"
|
name = "actix-utils"
|
||||||
version = "0.4.6"
|
version = "0.4.7"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Actix utils - various actix net related services"
|
description = "Actix utils - various actix net related services"
|
||||||
keywords = ["network", "framework", "async", "futures"]
|
keywords = ["network", "framework", "async", "futures"]
|
||||||
|
@@ -129,7 +129,6 @@ where
|
|||||||
};
|
};
|
||||||
|
|
||||||
let mut cell = self.inner.clone();
|
let mut cell = self.inner.clone();
|
||||||
cell.get_mut().task.register();
|
|
||||||
tokio_current_thread::spawn(self.service.call(item).then(move |item| {
|
tokio_current_thread::spawn(self.service.call(item).then(move |item| {
|
||||||
let inner = cell.get_mut();
|
let inner = cell.get_mut();
|
||||||
inner.buf.push_back(item);
|
inner.buf.push_back(item);
|
||||||
@@ -293,6 +292,8 @@ where
|
|||||||
type Error = FramedTransportError<S::Error, U>;
|
type Error = FramedTransportError<S::Error, U>;
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
self.inner.get_ref().task.register();
|
||||||
|
|
||||||
match mem::replace(&mut self.state, TransportState::Processing) {
|
match mem::replace(&mut self.state, TransportState::Processing) {
|
||||||
TransportState::Processing => {
|
TransportState::Processing => {
|
||||||
if self.poll_read() || self.poll_write() {
|
if self.poll_read() || self.poll_write() {
|
||||||
|
Reference in New Issue
Block a user