1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-12-01 02:44:37 +01:00

Merge branch 'master' into apply-mask

This commit is contained in:
Nikolay Kim 2018-07-11 13:15:35 +06:00 committed by GitHub
commit b22132d3d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 42 additions and 47 deletions

View File

@ -3,26 +3,13 @@ environment:
PROJECT_NAME: actix PROJECT_NAME: actix
matrix: matrix:
# Stable channel # Stable channel
- TARGET: i686-pc-windows-gnu
CHANNEL: stable
- TARGET: i686-pc-windows-msvc - TARGET: i686-pc-windows-msvc
CHANNEL: stable CHANNEL: stable
- TARGET: x86_64-pc-windows-gnu - TARGET: x86_64-pc-windows-gnu
CHANNEL: stable CHANNEL: stable
- TARGET: x86_64-pc-windows-msvc - TARGET: x86_64-pc-windows-msvc
CHANNEL: stable CHANNEL: stable
# Beta channel
- TARGET: i686-pc-windows-gnu
CHANNEL: beta
- TARGET: i686-pc-windows-msvc
CHANNEL: beta
- TARGET: x86_64-pc-windows-gnu
CHANNEL: beta
- TARGET: x86_64-pc-windows-msvc
CHANNEL: beta
# Nightly channel # Nightly channel
- TARGET: i686-pc-windows-gnu
CHANNEL: nightly
- TARGET: i686-pc-windows-msvc - TARGET: i686-pc-windows-msvc
CHANNEL: nightly CHANNEL: nightly
- TARGET: x86_64-pc-windows-gnu - TARGET: x86_64-pc-windows-gnu

View File

@ -580,6 +580,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
Frame::Chunk(Some(chunk)) => { Frame::Chunk(Some(chunk)) => {
match io.write(&chunk) { match io.write(&chunk) {
Err(err) => { Err(err) => {
info.context = Some(ctx);
info.error = Some(err.into()); info.error = Some(err.into());
return Ok( return Ok(
FinishingMiddlewares::init( FinishingMiddlewares::init(
@ -606,6 +607,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
break; break;
} }
Err(err) => { Err(err) => {
info.context = Some(ctx);
info.error = Some(err); info.error = Some(err);
return Ok(FinishingMiddlewares::init( return Ok(FinishingMiddlewares::init(
info, mws, self.resp, info, mws, self.resp,
@ -641,6 +643,12 @@ impl<S: 'static, H> ProcessResponse<S, H> {
} }
Ok(Async::NotReady) => return Err(PipelineState::Response(self)), Ok(Async::NotReady) => return Err(PipelineState::Response(self)),
Err(err) => { Err(err) => {
if let IOState::Actor(mut ctx) =
mem::replace(&mut self.iostate, IOState::Done)
{
ctx.disconnected();
info.context = Some(ctx);
}
info.error = Some(err.into()); info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, mws, self.resp)); return Ok(FinishingMiddlewares::init(info, mws, self.resp));
} }
@ -755,8 +763,14 @@ impl<S, H> Completed<S, H> {
if info.context.is_none() { if info.context.is_none() {
PipelineState::None PipelineState::None
} else { } else {
match info.poll_context() {
Ok(Async::NotReady) => {
PipelineState::Completed(Completed(PhantomData, PhantomData)) PipelineState::Completed(Completed(PhantomData, PhantomData))
} }
Ok(Async::Ready(())) => PipelineState::None,
Err(_) => PipelineState::Error,
}
}
} }
#[inline] #[inline]

View File

@ -94,13 +94,13 @@ where
self.node = Some(Node::new(el)); self.node = Some(Node::new(el));
let _ = match self.proto { let _ = match self.proto {
Some(HttpProtocol::H1(ref mut h1)) => { Some(HttpProtocol::H1(ref mut h1)) => {
self.node.as_ref().map(|n| h1.settings().head().insert(n)) self.node.as_mut().map(|n| h1.settings().head().insert(n))
} }
Some(HttpProtocol::H2(ref mut h2)) => { Some(HttpProtocol::H2(ref mut h2)) => {
self.node.as_ref().map(|n| h2.settings().head().insert(n)) self.node.as_mut().map(|n| h2.settings().head().insert(n))
} }
Some(HttpProtocol::Unknown(ref mut settings, _, _, _)) => { Some(HttpProtocol::Unknown(ref mut settings, _, _, _)) => {
self.node.as_ref().map(|n| settings.head().insert(n)) self.node.as_mut().map(|n| settings.head().insert(n))
} }
None => unreachable!(), None => unreachable!(),
}; };
@ -188,8 +188,8 @@ where
} }
pub(crate) struct Node<T> { pub(crate) struct Node<T> {
next: Option<*mut Node<()>>, next: Option<*mut Node<T>>,
prev: Option<*mut Node<()>>, prev: Option<*mut Node<T>>,
element: *mut T, element: *mut T,
} }
@ -202,19 +202,18 @@ impl<T> Node<T> {
} }
} }
fn insert<I>(&self, next: &Node<I>) { fn insert<I>(&mut self, next: &mut Node<I>) {
unsafe { unsafe {
if let Some(ref next2) = self.next { let next: *mut Node<T> = next as *const _ as *mut _;
let n: &mut Node<()> =
&mut *(next2.as_ref().unwrap() as *const _ as *mut _); if let Some(ref mut next2) = self.next {
n.prev = Some(next as *const _ as *mut _); let n = next2.as_mut().unwrap();
n.prev = Some(next);
} }
let slf: &mut Node<T> = &mut *(self as *const _ as *mut _); self.next = Some(next);
slf.next = Some(next as *const _ as *mut _); let next: &mut Node<T> = &mut *next;
next.prev = Some(self as *mut _);
let next: &mut Node<T> = &mut *(next as *const _ as *mut _);
next.prev = Some(slf as *const _ as *mut _);
} }
} }

View File

@ -127,8 +127,8 @@ where
fn notify_disconnect(&mut self) { fn notify_disconnect(&mut self) {
// notify all tasks // notify all tasks
self.stream.disconnected(); self.stream.disconnected();
for entry in &mut self.tasks { for task in &mut self.tasks {
entry.pipe.disconnected() task.pipe.disconnected();
} }
} }
@ -239,6 +239,7 @@ where
if let Ok(Async::NotReady) = self.stream.poll_completed(true) { if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }
self.flags.insert(Flags::ERROR);
return Err(()); return Err(());
} }
@ -272,14 +273,10 @@ where
Err(err) => { Err(err) => {
// it is not possible to recover from error // it is not possible to recover from error
// during pipe handling, so just drop connection // during pipe handling, so just drop connection
error!("Unhandled error: {}", err); self.notify_disconnect();
self.tasks[idx].flags.insert(EntryFlags::ERROR); self.tasks[idx].flags.insert(EntryFlags::ERROR);
error!("Unhandled error1: {}", err);
// check stream state, we still can have valid data in buffer continue;
if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
return Ok(Async::NotReady);
}
return Err(());
} }
} }
} else if !self.tasks[idx].flags.contains(EntryFlags::FINISHED) { } else if !self.tasks[idx].flags.contains(EntryFlags::FINISHED) {
@ -292,6 +289,7 @@ where
self.notify_disconnect(); self.notify_disconnect();
self.tasks[idx].flags.insert(EntryFlags::ERROR); self.tasks[idx].flags.insert(EntryFlags::ERROR);
error!("Unhandled error: {}", err); error!("Unhandled error: {}", err);
continue;
} }
} }
} }

View File

@ -7,9 +7,9 @@ use std::{env, fmt, net};
use bytes::BytesMut; use bytes::BytesMut;
use futures_cpupool::CpuPool; use futures_cpupool::CpuPool;
use http::StatusCode; use http::StatusCode;
use lazycell::LazyCell;
use parking_lot::Mutex; use parking_lot::Mutex;
use time; use time;
use lazycell::LazyCell;
use super::channel::Node; use super::channel::Node;
use super::message::{Request, RequestPool}; use super::message::{Request, RequestPool};
@ -151,7 +151,7 @@ pub(crate) struct WorkerSettings<H> {
bytes: Rc<SharedBytesPool>, bytes: Rc<SharedBytesPool>,
messages: &'static RequestPool, messages: &'static RequestPool,
channels: Cell<usize>, channels: Cell<usize>,
node: Box<Node<()>>, node: RefCell<Node<()>>,
date: UnsafeCell<Date>, date: UnsafeCell<Date>,
} }
@ -170,7 +170,7 @@ impl<H> WorkerSettings<H> {
bytes: Rc::new(SharedBytesPool::new()), bytes: Rc::new(SharedBytesPool::new()),
messages: RequestPool::pool(settings), messages: RequestPool::pool(settings),
channels: Cell::new(0), channels: Cell::new(0),
node: Box::new(Node::head()), node: RefCell::new(Node::head()),
date: UnsafeCell::new(Date::new()), date: UnsafeCell::new(Date::new()),
keep_alive, keep_alive,
ka_enabled, ka_enabled,
@ -181,8 +181,8 @@ impl<H> WorkerSettings<H> {
self.channels.get() self.channels.get()
} }
pub fn head(&self) -> &Node<()> { pub fn head(&self) -> RefMut<Node<()>> {
&self.node self.node.borrow_mut()
} }
pub fn handlers(&self) -> RefMut<Vec<H>> { pub fn handlers(&self) -> RefMut<Vec<H>> {

View File

@ -477,10 +477,7 @@ impl<H: IntoHttpHandler> HttpServer<H> {
/// .run(); /// .run();
/// } /// }
/// ``` /// ```
pub fn run(mut self) { pub fn run(self) {
self.exit = true;
self.no_signals = false;
let sys = System::new("http-server"); let sys = System::new("http-server");
self.start(); self.start();
sys.run(); sys.run();