1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-02-17 15:53:31 +01:00

Re-register task on every future poll

This commit is contained in:
Nikolay Kim 2019-10-14 17:55:52 +06:00
parent 115e82329f
commit 2e8c2c7733
7 changed files with 21 additions and 4 deletions

View File

@ -1,5 +1,11 @@
# Changes # Changes
## [0.1.1] - 2019-10-14
* Re-register task on every dispatcher poll.
## [0.1.0] - 2019-09-25 ## [0.1.0] - 2019-09-25
* Initial release * Initial release

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-ioframe" name = "actix-ioframe"
version = "0.1.0" version = "0.1.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix framed service" description = "Actix framed service"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View File

@ -29,6 +29,10 @@ impl<T> Cell<T> {
} }
} }
pub(crate) unsafe fn get_ref(&mut self) -> &T {
&*self.inner.as_ref().get()
}
pub(crate) unsafe fn get_mut(&mut self) -> &mut T { pub(crate) unsafe fn get_mut(&mut self) -> &mut T {
&mut *self.inner.as_ref().get() &mut *self.inner.as_ref().get()
} }

View File

@ -154,7 +154,6 @@ where
}; };
let mut cell = self.inner.clone(); let mut cell = self.inner.clone();
unsafe { cell.get_mut().task.register() };
tokio_current_thread::spawn( tokio_current_thread::spawn(
self.service self.service
.call(Item::new(self.state.clone(), self.sink.clone(), item)) .call(Item::new(self.state.clone(), self.sink.clone(), item))
@ -275,6 +274,8 @@ where
type Error = ServiceError<S::Error, U>; type Error = ServiceError<S::Error, U>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
unsafe { self.inner.get_ref().task.register() };
match mem::replace(&mut self.dispatch_state, FramedState::Processing) { match mem::replace(&mut self.dispatch_state, FramedState::Processing) {
FramedState::Processing => { FramedState::Processing => {
if self.poll_read() || self.poll_write() { if self.poll_read() || self.poll_write() {

View File

@ -1,5 +1,10 @@
# Changes # Changes
## [0.4.7] - 2019-10-14
* Re-register task on every framed transport poll.
## [0.4.6] - 2019-10-08 ## [0.4.6] - 2019-10-08
* Refactor `Counter` type. register current task in available method. * Refactor `Counter` type. register current task in available method.

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-utils" name = "actix-utils"
version = "0.4.6" version = "0.4.7"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix utils - various actix net related services" description = "Actix utils - various actix net related services"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View File

@ -129,7 +129,6 @@ where
}; };
let mut cell = self.inner.clone(); let mut cell = self.inner.clone();
cell.get_mut().task.register();
tokio_current_thread::spawn(self.service.call(item).then(move |item| { tokio_current_thread::spawn(self.service.call(item).then(move |item| {
let inner = cell.get_mut(); let inner = cell.get_mut();
inner.buf.push_back(item); inner.buf.push_back(item);
@ -293,6 +292,8 @@ where
type Error = FramedTransportError<S::Error, U>; type Error = FramedTransportError<S::Error, U>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.get_ref().task.register();
match mem::replace(&mut self.state, TransportState::Processing) { match mem::replace(&mut self.state, TransportState::Processing) {
TransportState::Processing => { TransportState::Processing => {
if self.poll_read() || self.poll_write() { if self.poll_read() || self.poll_write() {