1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-27 17:22:57 +01:00

Fix disconnection handling

This commit is contained in:
Nikolay Kim 2017-10-25 16:25:26 -07:00
parent da79981d90
commit 86583049fa
7 changed files with 188 additions and 87 deletions

View File

@ -11,6 +11,8 @@
* Re-use `BinaryBody` for `Frame::Payload` * Re-use `BinaryBody` for `Frame::Payload`
* Fix disconnection handling.
## 0.1.0 (2017-10-23) ## 0.1.0 (2017-10-23)

View File

@ -49,7 +49,7 @@ tokio-proto = "0.1"
# h2 = { git = 'https://github.com/carllerche/h2', optional = true } # h2 = { git = 'https://github.com/carllerche/h2', optional = true }
[dependencies.actix] [dependencies.actix]
#version = "0.3" #version = ">=0.3.1"
#path = "../actix" #path = "../actix"
git = "https://github.com/actix/actix.git" git = "https://github.com/actix/actix.git"
default-features = false default-features = false

View File

@ -49,7 +49,7 @@ impl Body {
} }
/// Create body from slice (copy) /// Create body from slice (copy)
pub fn from_slice<'a>(s: &'a [u8]) -> Body { pub fn from_slice(s: &[u8]) -> Body {
Body::Binary(BinaryBody::Bytes(Bytes::from(s))) Body::Binary(BinaryBody::Bytes(Bytes::from(s)))
} }
} }
@ -61,19 +61,23 @@ impl<T> From<T> for Body where T: Into<BinaryBody>{
} }
impl BinaryBody { impl BinaryBody {
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
match self { match *self {
&BinaryBody::Bytes(ref bytes) => bytes.len(), BinaryBody::Bytes(ref bytes) => bytes.len(),
&BinaryBody::Slice(slice) => slice.len(), BinaryBody::Slice(slice) => slice.len(),
&BinaryBody::SharedBytes(ref bytes) => bytes.len(), BinaryBody::SharedBytes(ref bytes) => bytes.len(),
&BinaryBody::ArcSharedBytes(ref bytes) => bytes.len(), BinaryBody::ArcSharedBytes(ref bytes) => bytes.len(),
&BinaryBody::SharedString(ref s) => s.len(), BinaryBody::SharedString(ref s) => s.len(),
&BinaryBody::ArcSharedString(ref s) => s.len(), BinaryBody::ArcSharedString(ref s) => s.len(),
} }
} }
/// Create binary body from slice /// Create binary body from slice
pub fn from_slice<'a>(s: &'a [u8]) -> BinaryBody { pub fn from_slice(s: &[u8]) -> BinaryBody {
BinaryBody::Bytes(Bytes::from(s)) BinaryBody::Bytes(Bytes::from(s))
} }
} }
@ -164,13 +168,13 @@ impl<'a> From<&'a Arc<String>> for BinaryBody {
impl AsRef<[u8]> for BinaryBody { impl AsRef<[u8]> for BinaryBody {
fn as_ref(&self) -> &[u8] { fn as_ref(&self) -> &[u8] {
match self { match *self {
&BinaryBody::Bytes(ref bytes) => bytes.as_ref(), BinaryBody::Bytes(ref bytes) => bytes.as_ref(),
&BinaryBody::Slice(slice) => slice, BinaryBody::Slice(slice) => slice,
&BinaryBody::SharedBytes(ref bytes) => bytes.as_ref(), BinaryBody::SharedBytes(ref bytes) => bytes.as_ref(),
&BinaryBody::ArcSharedBytes(ref bytes) => bytes.as_ref(), BinaryBody::ArcSharedBytes(ref bytes) => bytes.as_ref(),
&BinaryBody::SharedString(ref s) => s.as_bytes(), BinaryBody::SharedString(ref s) => s.as_bytes(),
&BinaryBody::ArcSharedString(ref s) => s.as_bytes(), BinaryBody::ArcSharedString(ref s) => s.as_bytes(),
} }
} }
} }

View File

@ -10,6 +10,7 @@ use actix::fut::ActorFuture;
use actix::dev::{AsyncContextApi, ActorAddressCell, ActorItemsCell, ActorWaitCell, SpawnHandle, use actix::dev::{AsyncContextApi, ActorAddressCell, ActorItemsCell, ActorWaitCell, SpawnHandle,
Envelope, ToEnvelope, RemoteEnvelope}; Envelope, ToEnvelope, RemoteEnvelope};
use task::IoContext;
use body::BinaryBody; use body::BinaryBody;
use route::{Route, Frame}; use route::{Route, Frame};
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
@ -26,10 +27,20 @@ pub struct HttpContext<A> where A: Actor<Context=HttpContext<A>> + Route,
stream: VecDeque<Frame>, stream: VecDeque<Frame>,
wait: ActorWaitCell<A>, wait: ActorWaitCell<A>,
app_state: Rc<<A as Route>::State>, app_state: Rc<<A as Route>::State>,
disconnected: bool,
} }
impl<A> IoContext for HttpContext<A> where A: Actor<Context=Self> + Route {
impl<A> ActorContext<A> for HttpContext<A> where A: Actor<Context=Self> + Route fn disconnected(&mut self) {
self.disconnected = true;
if self.state == ActorState::Running {
self.state = ActorState::Stopping;
}
}
}
impl<A> ActorContext for HttpContext<A> where A: Actor<Context=Self> + Route
{ {
/// Stop actor execution /// Stop actor execution
fn stop(&mut self) { fn stop(&mut self) {
@ -95,6 +106,7 @@ impl<A> HttpContext<A> where A: Actor<Context=Self> + Route {
wait: ActorWaitCell::default(), wait: ActorWaitCell::default(),
stream: VecDeque::new(), stream: VecDeque::new(),
app_state: state, app_state: state,
disconnected: false,
} }
} }
@ -124,6 +136,11 @@ impl<A> HttpContext<A> where A: Actor<Context=Self> + Route {
pub fn write_eof(&mut self) { pub fn write_eof(&mut self) {
self.stream.push_back(Frame::Payload(None)) self.stream.push_back(Frame::Payload(None))
} }
/// Check if connection still open
pub fn connected(&self) -> bool {
!self.disconnected
}
} }
impl<A> HttpContext<A> where A: Actor<Context=Self> + Route { impl<A> HttpContext<A> where A: Actor<Context=Self> + Route {
@ -157,7 +174,6 @@ impl<A> Stream for HttpContext<A> where A: Actor<Context=Self> + Route
if self.act.is_none() { if self.act.is_none() {
return Ok(Async::NotReady) return Ok(Async::NotReady)
} }
let act: &mut A = unsafe { let act: &mut A = unsafe {
std::mem::transmute(self.act.as_mut().unwrap() as &mut A) std::mem::transmute(self.act.as_mut().unwrap() as &mut A)
}; };

View File

@ -158,7 +158,7 @@ impl<A> Reply<A> where A: Actor + Route
}, },
ReplyItem::Actor(act) => { ReplyItem::Actor(act) => {
ctx.set_actor(act); ctx.set_actor(act);
Task::with_stream(ctx) Task::with_context(ctx)
} }
} }
} }

View File

@ -171,11 +171,11 @@ pub struct HttpChannel<T: 'static, A: 'static, H: 'static> {
keepalive_timer: Option<Timeout>, keepalive_timer: Option<Timeout>,
} }
/*impl<T: 'static, A: 'static> Drop for HttpChannel<T, A> { impl<T: 'static, A: 'static, H: 'static> Drop for HttpChannel<T, A, H> {
fn drop(&mut self) { fn drop(&mut self) {
println!("Drop http channel"); println!("Drop http channel");
} }
}*/ }
impl<T, A, H> Actor for HttpChannel<T, A, H> impl<T, A, H> Actor for HttpChannel<T, A, H>
where T: AsyncRead + AsyncWrite + 'static, where T: AsyncRead + AsyncWrite + 'static,
@ -205,6 +205,8 @@ impl<T, A, H> Future for HttpChannel<T, A, H>
} }
loop { loop {
let mut not_ready = true;
// check in-flight messages // check in-flight messages
let mut idx = 0; let mut idx = 0;
while idx < self.items.len() { while idx < self.items.len() {
@ -218,6 +220,7 @@ impl<T, A, H> Future for HttpChannel<T, A, H>
match self.items[idx].task.poll_io(&mut self.stream, req) match self.items[idx].task.poll_io(&mut self.stream, req)
{ {
Ok(Async::Ready(ready)) => { Ok(Async::Ready(ready)) => {
not_ready = false;
let mut item = self.items.pop_front().unwrap(); let mut item = self.items.pop_front().unwrap();
// overide keep-alive state // overide keep-alive state
@ -247,8 +250,10 @@ impl<T, A, H> Future for HttpChannel<T, A, H>
} else if !self.items[idx].finished && !self.items[idx].error { } else if !self.items[idx].finished && !self.items[idx].error {
match self.items[idx].task.poll() { match self.items[idx].task.poll() {
Ok(Async::NotReady) => (), Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) => Ok(Async::Ready(_)) => {
self.items[idx].finished = true, not_ready = false;
self.items[idx].finished = true;
},
Err(_) => Err(_) =>
self.items[idx].error = true, self.items[idx].error = true,
} }
@ -267,8 +272,10 @@ impl<T, A, H> Future for HttpChannel<T, A, H>
if !self.inactive[idx].finished && !self.inactive[idx].error { if !self.inactive[idx].finished && !self.inactive[idx].error {
match self.inactive[idx].task.poll() { match self.inactive[idx].task.poll() {
Ok(Async::NotReady) => (), Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) => Ok(Async::Ready(_)) => {
self.inactive[idx].finished = true, not_ready = false;
self.inactive[idx].finished = true
}
Err(_) => Err(_) =>
self.inactive[idx].error = true, self.inactive[idx].error = true,
} }
@ -280,6 +287,8 @@ impl<T, A, H> Future for HttpChannel<T, A, H>
if !self.error && self.items.len() < MAX_PIPELINED_MESSAGES { if !self.error && self.items.len() < MAX_PIPELINED_MESSAGES {
match self.reader.parse(&mut self.stream) { match self.reader.parse(&mut self.stream) {
Ok(Async::Ready((mut req, payload))) => { Ok(Async::Ready((mut req, payload))) => {
not_ready = false;
// stop keepalive timer // stop keepalive timer
self.keepalive_timer.take(); self.keepalive_timer.take();
@ -300,6 +309,12 @@ impl<T, A, H> Future for HttpChannel<T, A, H>
finished: false}); finished: false});
} }
Err(err) => { Err(err) => {
// notify all tasks
not_ready = false;
for entry in &mut self.items {
entry.task.disconnected()
}
// kill keepalive // kill keepalive
self.keepalive = false; self.keepalive = false;
self.keepalive_timer.take(); self.keepalive_timer.take();
@ -344,6 +359,10 @@ impl<T, A, H> Future for HttpChannel<T, A, H>
if self.items.is_empty() && self.inactive.is_empty() && self.error { if self.items.is_empty() && self.inactive.is_empty() && self.error {
return Ok(Async::Ready(())) return Ok(Async::Ready(()))
} }
if not_ready {
return Ok(Async::NotReady)
}
} }
} }
} }

View File

@ -1,4 +1,4 @@
use std::{cmp, io}; use std::{mem, cmp, io};
use std::rc::Rc; use std::rc::Rc;
use std::fmt::Write; use std::fmt::Write;
use std::collections::VecDeque; use std::collections::VecDeque;
@ -47,16 +47,27 @@ impl TaskIOState {
} }
} }
enum TaskStream {
None,
Stream(Box<FrameStream>),
Context(Box<IoContext<Item=Frame, Error=io::Error>>),
}
pub(crate) trait IoContext: Stream<Item=Frame, Error=io::Error> + 'static {
fn disconnected(&mut self);
}
pub struct Task { pub struct Task {
state: TaskRunningState, state: TaskRunningState,
iostate: TaskIOState, iostate: TaskIOState,
frames: VecDeque<Frame>, frames: VecDeque<Frame>,
stream: Option<Box<FrameStream>>, stream: TaskStream,
encoder: Encoder, encoder: Encoder,
buffer: BytesMut, buffer: BytesMut,
upgrade: bool, upgrade: bool,
keepalive: bool, keepalive: bool,
prepared: Option<HttpResponse>, prepared: Option<HttpResponse>,
disconnected: bool,
middlewares: Option<Rc<Vec<Box<Middleware>>>>, middlewares: Option<Rc<Vec<Box<Middleware>>>>,
} }
@ -71,12 +82,13 @@ impl Task {
state: TaskRunningState::Running, state: TaskRunningState::Running,
iostate: TaskIOState::Done, iostate: TaskIOState::Done,
frames: frames, frames: frames,
stream: None, stream: TaskStream::None,
encoder: Encoder::length(0), encoder: Encoder::length(0),
buffer: BytesMut::new(), buffer: BytesMut::new(),
upgrade: false, upgrade: false,
keepalive: false, keepalive: false,
prepared: None, prepared: None,
disconnected: false,
middlewares: None, middlewares: None,
} }
} }
@ -88,12 +100,30 @@ impl Task {
state: TaskRunningState::Running, state: TaskRunningState::Running,
iostate: TaskIOState::ReadingMessage, iostate: TaskIOState::ReadingMessage,
frames: VecDeque::new(), frames: VecDeque::new(),
stream: Some(Box::new(stream)), stream: TaskStream::Stream(Box::new(stream)),
encoder: Encoder::length(0), encoder: Encoder::length(0),
buffer: BytesMut::new(), buffer: BytesMut::new(),
upgrade: false, upgrade: false,
keepalive: false, keepalive: false,
prepared: None, prepared: None,
disconnected: false,
middlewares: None,
}
}
pub(crate) fn with_context<C: IoContext>(ctx: C) -> Self
{
Task {
state: TaskRunningState::Running,
iostate: TaskIOState::ReadingMessage,
frames: VecDeque::new(),
stream: TaskStream::Context(Box::new(ctx)),
encoder: Encoder::length(0),
buffer: BytesMut::new(),
upgrade: false,
keepalive: false,
prepared: None,
disconnected: false,
middlewares: None, middlewares: None,
} }
} }
@ -106,6 +136,15 @@ impl Task {
self.middlewares = Some(middlewares); self.middlewares = Some(middlewares);
} }
pub(crate) fn disconnected(&mut self) {
let len = self.buffer.len();
self.buffer.split_to(len);
self.disconnected = true;
if let TaskStream::Context(ref mut ctx) = self.stream {
ctx.disconnected();
}
}
fn prepare(&mut self, req: &mut HttpRequest, msg: HttpResponse) fn prepare(&mut self, req: &mut HttpRequest, msg: HttpResponse)
{ {
trace!("Prepare message status={:?}", msg.status); trace!("Prepare message status={:?}", msg.status);
@ -252,9 +291,12 @@ impl Task {
trace!("IO Frame: {:?}", frame); trace!("IO Frame: {:?}", frame);
match frame { match frame {
Frame::Message(response) => { Frame::Message(response) => {
if !self.disconnected {
self.prepare(req, response); self.prepare(req, response);
} }
}
Frame::Payload(Some(chunk)) => { Frame::Payload(Some(chunk)) => {
if !self.disconnected {
if self.prepared.is_some() { if self.prepared.is_some() {
// TODO: add warning, write after EOF // TODO: add warning, write after EOF
self.encoder.encode(&mut self.buffer, chunk.as_ref()); self.encoder.encode(&mut self.buffer, chunk.as_ref());
@ -262,10 +304,13 @@ impl Task {
// might be response for EXCEPT // might be response for EXCEPT
self.buffer.extend_from_slice(chunk.as_ref()) self.buffer.extend_from_slice(chunk.as_ref())
} }
}
}, },
Frame::Payload(None) => { Frame::Payload(None) => {
if !self.disconnected &&
!self.encoder.encode(&mut self.buffer, [].as_ref())
{
// TODO: add error "not eof"" // TODO: add error "not eof""
if !self.encoder.encode(&mut self.buffer, [].as_ref()) {
debug!("last payload item, but it is not EOF "); debug!("last payload item, but it is not EOF ");
return Err(()) return Err(())
} }
@ -276,6 +321,7 @@ impl Task {
} }
// write bytes to TcpStream // write bytes to TcpStream
if !self.disconnected {
while !self.buffer.is_empty() { while !self.buffer.is_empty() {
match io.write(self.buffer.as_ref()) { match io.write(self.buffer.as_ref()) {
Ok(n) => { Ok(n) => {
@ -287,6 +333,7 @@ impl Task {
Err(_) => return Err(()), Err(_) => return Err(()),
} }
} }
}
// should pause task // should pause task
if self.state != TaskRunningState::Done { if self.state != TaskRunningState::Done {
@ -295,10 +342,13 @@ impl Task {
} else if self.state == TaskRunningState::Paused { } else if self.state == TaskRunningState::Paused {
self.state = TaskRunningState::Running; self.state = TaskRunningState::Running;
} }
} else {
// at this point we wont get any more Frames
self.iostate = TaskIOState::Done;
} }
// response is completed // response is completed
if self.buffer.is_empty() && self.iostate.is_done() { if (self.buffer.is_empty() || self.disconnected) && self.iostate.is_done() {
// run middlewares // run middlewares
if let Some(ref mut resp) = self.prepared { if let Some(ref mut resp) = self.prepared {
if let Some(middlewares) = self.middlewares.take() { if let Some(middlewares) = self.middlewares.take() {
@ -313,14 +363,10 @@ impl Task {
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }
}
impl Future for Task { fn poll_stream<S>(&mut self, stream: &mut S) -> Poll<(), ()>
type Item = (); where S: Stream<Item=Frame, Error=io::Error>
type Error = (); {
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut stream) = self.stream {
loop { loop {
match stream.poll() { match stream.poll() {
Ok(Async::Ready(Some(frame))) => { Ok(Async::Ready(Some(frame))) => {
@ -356,10 +402,24 @@ impl Future for Task {
return Err(()) return Err(())
} }
} }
} else {
Ok(Async::Ready(()))
} }
} }
impl Future for Task {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut s = mem::replace(&mut self.stream, TaskStream::None);
let result = match s {
TaskStream::None => Ok(Async::Ready(())),
TaskStream::Stream(ref mut stream) => self.poll_stream(stream),
TaskStream::Context(ref mut context) => self.poll_stream(context),
};
self.stream = s;
result
}
} }
/// Encoders to handle different Transfer-Encodings. /// Encoders to handle different Transfer-Encodings.