diff --git a/.appveyor.yml b/.appveyor.yml index 4bcd7732..7addc8c0 100644 --- a/.appveyor.yml +++ b/.appveyor.yml @@ -3,26 +3,13 @@ environment: PROJECT_NAME: actix matrix: # Stable channel - - TARGET: i686-pc-windows-gnu - CHANNEL: stable - TARGET: i686-pc-windows-msvc CHANNEL: stable - TARGET: x86_64-pc-windows-gnu CHANNEL: stable - TARGET: x86_64-pc-windows-msvc CHANNEL: stable - # Beta channel - - TARGET: i686-pc-windows-gnu - CHANNEL: beta - - TARGET: i686-pc-windows-msvc - CHANNEL: beta - - TARGET: x86_64-pc-windows-gnu - CHANNEL: beta - - TARGET: x86_64-pc-windows-msvc - CHANNEL: beta # Nightly channel - - TARGET: i686-pc-windows-gnu - CHANNEL: nightly - TARGET: i686-pc-windows-msvc CHANNEL: nightly - TARGET: x86_64-pc-windows-gnu diff --git a/src/pipeline.rs b/src/pipeline.rs index 528680f5..66b2f29a 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -580,6 +580,7 @@ impl ProcessResponse { Frame::Chunk(Some(chunk)) => { match io.write(&chunk) { Err(err) => { + info.context = Some(ctx); info.error = Some(err.into()); return Ok( FinishingMiddlewares::init( @@ -606,6 +607,7 @@ impl ProcessResponse { break; } Err(err) => { + info.context = Some(ctx); info.error = Some(err); return Ok(FinishingMiddlewares::init( info, mws, self.resp, @@ -641,6 +643,12 @@ impl ProcessResponse { } Ok(Async::NotReady) => return Err(PipelineState::Response(self)), Err(err) => { + if let IOState::Actor(mut ctx) = + mem::replace(&mut self.iostate, IOState::Done) + { + ctx.disconnected(); + info.context = Some(ctx); + } info.error = Some(err.into()); return Ok(FinishingMiddlewares::init(info, mws, self.resp)); } @@ -755,7 +763,13 @@ impl Completed { if info.context.is_none() { PipelineState::None } else { - PipelineState::Completed(Completed(PhantomData, PhantomData)) + match info.poll_context() { + Ok(Async::NotReady) => { + PipelineState::Completed(Completed(PhantomData, PhantomData)) + } + Ok(Async::Ready(())) => PipelineState::None, + Err(_) => PipelineState::Error, + } } } diff --git a/src/server/channel.rs b/src/server/channel.rs index 1439ddcb..b817b416 100644 --- a/src/server/channel.rs +++ b/src/server/channel.rs @@ -94,13 +94,13 @@ where self.node = Some(Node::new(el)); let _ = match self.proto { Some(HttpProtocol::H1(ref mut h1)) => { - self.node.as_ref().map(|n| h1.settings().head().insert(n)) + self.node.as_mut().map(|n| h1.settings().head().insert(n)) } Some(HttpProtocol::H2(ref mut h2)) => { - self.node.as_ref().map(|n| h2.settings().head().insert(n)) + self.node.as_mut().map(|n| h2.settings().head().insert(n)) } Some(HttpProtocol::Unknown(ref mut settings, _, _, _)) => { - self.node.as_ref().map(|n| settings.head().insert(n)) + self.node.as_mut().map(|n| settings.head().insert(n)) } None => unreachable!(), }; @@ -188,8 +188,8 @@ where } pub(crate) struct Node { - next: Option<*mut Node<()>>, - prev: Option<*mut Node<()>>, + next: Option<*mut Node>, + prev: Option<*mut Node>, element: *mut T, } @@ -202,19 +202,18 @@ impl Node { } } - fn insert(&self, next: &Node) { + fn insert(&mut self, next: &mut Node) { unsafe { - if let Some(ref next2) = self.next { - let n: &mut Node<()> = - &mut *(next2.as_ref().unwrap() as *const _ as *mut _); - n.prev = Some(next as *const _ as *mut _); + let next: *mut Node = next as *const _ as *mut _; + + if let Some(ref mut next2) = self.next { + let n = next2.as_mut().unwrap(); + n.prev = Some(next); } - let slf: &mut Node = &mut *(self as *const _ as *mut _); + self.next = Some(next); - slf.next = Some(next as *const _ as *mut _); - - let next: &mut Node = &mut *(next as *const _ as *mut _); - next.prev = Some(slf as *const _ as *mut _); + let next: &mut Node = &mut *next; + next.prev = Some(self as *mut _); } } diff --git a/src/server/h1.rs b/src/server/h1.rs index 6b1a5b9c..5b83dcc0 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -127,8 +127,8 @@ where fn notify_disconnect(&mut self) { // notify all tasks self.stream.disconnected(); - for entry in &mut self.tasks { - entry.pipe.disconnected() + for task in &mut self.tasks { + task.pipe.disconnected(); } } @@ -239,6 +239,7 @@ where if let Ok(Async::NotReady) = self.stream.poll_completed(true) { return Ok(Async::NotReady); } + self.flags.insert(Flags::ERROR); return Err(()); } @@ -272,14 +273,10 @@ where Err(err) => { // it is not possible to recover from error // during pipe handling, so just drop connection - error!("Unhandled error: {}", err); + self.notify_disconnect(); self.tasks[idx].flags.insert(EntryFlags::ERROR); - - // check stream state, we still can have valid data in buffer - if let Ok(Async::NotReady) = self.stream.poll_completed(true) { - return Ok(Async::NotReady); - } - return Err(()); + error!("Unhandled error1: {}", err); + continue; } } } else if !self.tasks[idx].flags.contains(EntryFlags::FINISHED) { @@ -292,6 +289,7 @@ where self.notify_disconnect(); self.tasks[idx].flags.insert(EntryFlags::ERROR); error!("Unhandled error: {}", err); + continue; } } } diff --git a/src/server/settings.rs b/src/server/settings.rs index 6ff9c298..cc2e1c06 100644 --- a/src/server/settings.rs +++ b/src/server/settings.rs @@ -7,9 +7,9 @@ use std::{env, fmt, net}; use bytes::BytesMut; use futures_cpupool::CpuPool; use http::StatusCode; +use lazycell::LazyCell; use parking_lot::Mutex; use time; -use lazycell::LazyCell; use super::channel::Node; use super::message::{Request, RequestPool}; @@ -151,7 +151,7 @@ pub(crate) struct WorkerSettings { bytes: Rc, messages: &'static RequestPool, channels: Cell, - node: Box>, + node: RefCell>, date: UnsafeCell, } @@ -170,7 +170,7 @@ impl WorkerSettings { bytes: Rc::new(SharedBytesPool::new()), messages: RequestPool::pool(settings), channels: Cell::new(0), - node: Box::new(Node::head()), + node: RefCell::new(Node::head()), date: UnsafeCell::new(Date::new()), keep_alive, ka_enabled, @@ -181,8 +181,8 @@ impl WorkerSettings { self.channels.get() } - pub fn head(&self) -> &Node<()> { - &self.node + pub fn head(&self) -> RefMut> { + self.node.borrow_mut() } pub fn handlers(&self) -> RefMut> { diff --git a/src/server/srv.rs b/src/server/srv.rs index 8582ab1f..02580d01 100644 --- a/src/server/srv.rs +++ b/src/server/srv.rs @@ -477,10 +477,7 @@ impl HttpServer { /// .run(); /// } /// ``` - pub fn run(mut self) { - self.exit = true; - self.no_signals = false; - + pub fn run(self) { let sys = System::new("http-server"); self.start(); sys.run();