1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-22 23:05:56 +01:00

use ActorItemsCell

This commit is contained in:
Nikolay Kim 2017-10-09 10:44:11 -07:00
parent 6d2f02ee5e
commit 994a9e907e
2 changed files with 11 additions and 56 deletions

View File

@ -6,7 +6,7 @@ use futures::{Async, Stream, Poll};
use bytes::Bytes; use bytes::Bytes;
use actix::{Actor, ActorState, ActorContext, AsyncActorContext}; use actix::{Actor, ActorState, ActorContext, AsyncActorContext};
use actix::fut::ActorFuture; use actix::fut::ActorFuture;
use actix::dev::{AsyncContextApi, ActorAddressCell}; use actix::dev::{AsyncContextApi, ActorAddressCell, ActorItemsCell, SpawnHandle};
use route::{Route, Frame}; use route::{Route, Frame};
use httpmessage::HttpResponse; use httpmessage::HttpResponse;
@ -17,7 +17,7 @@ pub struct HttpContext<A> where A: Actor<Context=HttpContext<A>> + Route,
{ {
act: Option<A>, act: Option<A>,
state: ActorState, state: ActorState,
items: Vec<Box<ActorFuture<Item=(), Error=(), Actor=A>>>, items: ActorItemsCell<A>,
address: ActorAddressCell<A>, address: ActorAddressCell<A>,
stream: VecDeque<Frame>, stream: VecDeque<Frame>,
app_state: Rc<<A as Route>::State>, app_state: Rc<<A as Route>::State>,
@ -37,7 +37,7 @@ impl<A> ActorContext<A> for HttpContext<A> where A: Actor<Context=Self> + Route
/// Terminate actor execution /// Terminate actor execution
fn terminate(&mut self) { fn terminate(&mut self) {
self.address.close(); self.address.close();
self.items.clear(); self.items.close();
self.state = ActorState::Stopped; self.state = ActorState::Stopped;
} }
@ -49,14 +49,14 @@ impl<A> ActorContext<A> for HttpContext<A> where A: Actor<Context=Self> + Route
impl<A> AsyncActorContext<A> for HttpContext<A> where A: Actor<Context=Self> + Route impl<A> AsyncActorContext<A> for HttpContext<A> where A: Actor<Context=Self> + Route
{ {
fn spawn<F>(&mut self, fut: F) fn spawn<F>(&mut self, fut: F) -> SpawnHandle
where F: ActorFuture<Item=(), Error=(), Actor=A> + 'static where F: ActorFuture<Item=(), Error=(), Actor=A> + 'static
{ {
if self.state == ActorState::Stopped { self.items.spawn(fut)
error!("Context::spawn called for stopped actor.");
} else {
self.items.push(Box::new(fut))
} }
fn cancel_future(&mut self, handle: SpawnHandle) -> bool {
self.items.cancel_future(handle)
} }
} }
@ -74,7 +74,7 @@ impl<A> HttpContext<A> where A: Actor<Context=Self> + Route {
HttpContext { HttpContext {
act: None, act: None,
state: ActorState::Started, state: ActorState::Started,
items: Vec::new(), items: ActorItemsCell::default(),
address: ActorAddressCell::default(), address: ActorAddressCell::default(),
stream: VecDeque::new(), stream: VecDeque::new(),
app_state: state, app_state: state,
@ -147,47 +147,7 @@ impl<A> Stream for HttpContext<A> where A: Actor<Context=Self> + Route
not_ready = false not_ready = false
} }
// check secondary streams self.items.poll(act, ctx);
let mut idx = 0;
let mut len = self.items.len();
loop {
if idx >= len {
break
}
let (drop, item) = match self.items[idx].poll(act, ctx) {
Ok(val) => match val {
Async::Ready(_) => {
not_ready = false;
(true, None)
}
Async::NotReady => (false, None),
},
Err(_) => (true, None)
};
// we have new pollable item
if let Some(item) = item {
self.items.push(item);
}
// number of items could be different, context can add more items
len = self.items.len();
// item finishes, we need to remove it,
// replace current item with last item
if drop {
len -= 1;
if idx >= len {
self.items.pop();
break
} else {
self.items[idx] = self.items.pop().unwrap();
}
} else {
idx += 1;
}
}
// are we done // are we done
if !not_ready { if !not_ready {

View File

@ -56,11 +56,6 @@ impl Frame {
/// Create a new data frame. /// Create a new data frame.
#[inline] #[inline]
pub fn message(data: Vec<u8>, code: OpCode, finished: bool) -> Frame { pub fn message(data: Vec<u8>, code: OpCode, finished: bool) -> Frame {
debug_assert!(match code {
OpCode::Text | OpCode::Binary | OpCode::Continue => true,
_ => false,
}, "Invalid opcode for data frame.");
Frame { Frame {
finished: finished, finished: finished,
opcode: code, opcode: code,