1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 07:53:00 +01:00

fix http/2 error handling

This commit is contained in:
Nikolay Kim 2018-08-07 20:48:25 -07:00
parent 57f991280c
commit 30769e3072
6 changed files with 116 additions and 58 deletions

View File

@ -23,6 +23,8 @@
* Panic during access without routing being set #452 * Panic during access without routing being set #452
* Fixed http/2 error handling
### Deprecated ### Deprecated
* `HttpServer::no_http2()` is deprecated, use `OpensslAcceptor::with_flags()` or * `HttpServer::no_http2()` is deprecated, use `OpensslAcceptor::with_flags()` or

View File

@ -42,13 +42,6 @@ enum PipelineState<S, H> {
} }
impl<S: 'static, H: PipelineHandler<S>> PipelineState<S, H> { impl<S: 'static, H: PipelineHandler<S>> PipelineState<S, H> {
fn is_response(&self) -> bool {
match *self {
PipelineState::Response(_) => true,
_ => false,
}
}
fn poll( fn poll(
&mut self, info: &mut PipelineInfo<S>, mws: &[Box<Middleware<S>>], &mut self, info: &mut PipelineInfo<S>, mws: &[Box<Middleware<S>>],
) -> Option<PipelineState<S, H>> { ) -> Option<PipelineState<S, H>> {
@ -58,7 +51,8 @@ impl<S: 'static, H: PipelineHandler<S>> PipelineState<S, H> {
PipelineState::RunMiddlewares(ref mut state) => state.poll(info, mws), PipelineState::RunMiddlewares(ref mut state) => state.poll(info, mws),
PipelineState::Finishing(ref mut state) => state.poll(info, mws), PipelineState::Finishing(ref mut state) => state.poll(info, mws),
PipelineState::Completed(ref mut state) => state.poll(info), PipelineState::Completed(ref mut state) => state.poll(info),
PipelineState::Response(_) | PipelineState::None | PipelineState::Error => { PipelineState::Response(ref mut state) => state.poll(info, mws),
PipelineState::None | PipelineState::Error => {
None None
} }
} }
@ -130,7 +124,6 @@ impl<S: 'static, H: PipelineHandler<S>> HttpHandlerTask for Pipeline<S, H> {
let mut state = mem::replace(&mut self.1, PipelineState::None); let mut state = mem::replace(&mut self.1, PipelineState::None);
loop { loop {
if state.is_response() {
if let PipelineState::Response(st) = state { if let PipelineState::Response(st) = state {
match st.poll_io(io, &mut self.0, &self.2) { match st.poll_io(io, &mut self.0, &self.2) {
Ok(state) => { Ok(state) => {
@ -147,7 +140,6 @@ impl<S: 'static, H: PipelineHandler<S>> HttpHandlerTask for Pipeline<S, H> {
} }
} }
} }
}
match state { match state {
PipelineState::None => return Ok(Async::Ready(true)), PipelineState::None => return Ok(Async::Ready(true)),
PipelineState::Error => { PipelineState::Error => {
@ -401,7 +393,7 @@ impl<S: 'static, H> RunMiddlewares<S, H> {
} }
struct ProcessResponse<S, H> { struct ProcessResponse<S, H> {
resp: HttpResponse, resp: Option<HttpResponse>,
iostate: IOState, iostate: IOState,
running: RunningState, running: RunningState,
drain: Option<oneshot::Sender<()>>, drain: Option<oneshot::Sender<()>>,
@ -442,7 +434,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
#[inline] #[inline]
fn init(resp: HttpResponse) -> PipelineState<S, H> { fn init(resp: HttpResponse) -> PipelineState<S, H> {
PipelineState::Response(ProcessResponse { PipelineState::Response(ProcessResponse {
resp, resp: Some(resp),
iostate: IOState::Response, iostate: IOState::Response,
running: RunningState::Running, running: RunningState::Running,
drain: None, drain: None,
@ -451,6 +443,59 @@ impl<S: 'static, H> ProcessResponse<S, H> {
}) })
} }
fn poll(
&mut self, info: &mut PipelineInfo<S>, mws: &[Box<Middleware<S>>],
) -> Option<PipelineState<S, H>> {
println!("POLL");
// connection is dead at this point
match mem::replace(&mut self.iostate, IOState::Done) {
IOState::Response =>
Some(FinishingMiddlewares::init(info, mws, self.resp.take().unwrap())),
IOState::Payload(_) =>
Some(FinishingMiddlewares::init(info, mws, self.resp.take().unwrap())),
IOState::Actor(mut ctx) => {
if info.disconnected.take().is_some() {
ctx.disconnected();
}
loop {
match ctx.poll() {
Ok(Async::Ready(Some(vec))) => {
if vec.is_empty() {
continue;
}
for frame in vec {
match frame {
Frame::Chunk(None) => {
info.context = Some(ctx);
return Some(FinishingMiddlewares::init(
info, mws, self.resp.take().unwrap(),
))
}
Frame::Chunk(Some(_)) => (),
Frame::Drain(fut) => {let _ = fut.send(());},
}
}
}
Ok(Async::Ready(None)) =>
return Some(FinishingMiddlewares::init(
info, mws, self.resp.take().unwrap(),
)),
Ok(Async::NotReady) => {
self.iostate = IOState::Actor(ctx);
return None;
}
Err(err) => {
info.context = Some(ctx);
info.error = Some(err);
return Some(FinishingMiddlewares::init(info, mws, self.resp.take().unwrap()));
}
}
}
}
IOState::Done => Some(FinishingMiddlewares::init(info, mws, self.resp.take().unwrap()))
}
}
fn poll_io( fn poll_io(
mut self, io: &mut Writer, info: &mut PipelineInfo<S>, mut self, io: &mut Writer, info: &mut PipelineInfo<S>,
mws: &[Box<Middleware<S>>], mws: &[Box<Middleware<S>>],
@ -462,24 +507,24 @@ impl<S: 'static, H> ProcessResponse<S, H> {
let result = match mem::replace(&mut self.iostate, IOState::Done) { let result = match mem::replace(&mut self.iostate, IOState::Done) {
IOState::Response => { IOState::Response => {
let encoding = let encoding =
self.resp.content_encoding().unwrap_or(info.encoding); self.resp.as_ref().unwrap().content_encoding().unwrap_or(info.encoding);
let result = let result =
match io.start(&info.req, &mut self.resp, encoding) { match io.start(&info.req, self.resp.as_mut().unwrap(), encoding) {
Ok(res) => res, Ok(res) => res,
Err(err) => { Err(err) => {
info.error = Some(err.into()); info.error = Some(err.into());
return Ok(FinishingMiddlewares::init( return Ok(FinishingMiddlewares::init(
info, mws, self.resp, info, mws, self.resp.take().unwrap(),
)); ));
} }
}; };
if let Some(err) = self.resp.error() { if let Some(err) = self.resp.as_ref().unwrap().error() {
if self.resp.status().is_server_error() { if self.resp.as_ref().unwrap().status().is_server_error() {
error!( error!(
"Error occured during request handling, status: {} {}", "Error occured during request handling, status: {} {}",
self.resp.status(), err self.resp.as_ref().unwrap().status(), err
); );
} else { } else {
warn!( warn!(
@ -493,7 +538,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
} }
// always poll stream or actor for the first time // always poll stream or actor for the first time
match self.resp.replace_body(Body::Empty) { match self.resp.as_mut().unwrap().replace_body(Body::Empty) {
Body::Streaming(stream) => { Body::Streaming(stream) => {
self.iostate = IOState::Payload(stream); self.iostate = IOState::Payload(stream);
continue 'inner; continue 'inner;
@ -512,7 +557,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
if let Err(err) = io.write_eof() { if let Err(err) = io.write_eof() {
info.error = Some(err.into()); info.error = Some(err.into());
return Ok(FinishingMiddlewares::init( return Ok(FinishingMiddlewares::init(
info, mws, self.resp, info, mws, self.resp.take().unwrap(),
)); ));
} }
break; break;
@ -523,7 +568,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
Err(err) => { Err(err) => {
info.error = Some(err.into()); info.error = Some(err.into());
return Ok(FinishingMiddlewares::init( return Ok(FinishingMiddlewares::init(
info, mws, self.resp, info, mws, self.resp.take().unwrap(),
)); ));
} }
Ok(result) => result, Ok(result) => result,
@ -536,7 +581,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
Err(err) => { Err(err) => {
info.error = Some(err); info.error = Some(err);
return Ok(FinishingMiddlewares::init( return Ok(FinishingMiddlewares::init(
info, mws, self.resp, info, mws, self.resp.take().unwrap(),
)); ));
} }
}, },
@ -559,7 +604,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
info.error = Some(err.into()); info.error = Some(err.into());
return Ok( return Ok(
FinishingMiddlewares::init( FinishingMiddlewares::init(
info, mws, self.resp, info, mws, self.resp.take().unwrap(),
), ),
); );
} }
@ -572,7 +617,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
info.error = Some(err.into()); info.error = Some(err.into());
return Ok( return Ok(
FinishingMiddlewares::init( FinishingMiddlewares::init(
info, mws, self.resp, info, mws, self.resp.take().unwrap(),
), ),
); );
} }
@ -598,7 +643,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
info.context = Some(ctx); info.context = Some(ctx);
info.error = Some(err); info.error = Some(err);
return Ok(FinishingMiddlewares::init( return Ok(FinishingMiddlewares::init(
info, mws, self.resp, info, mws, self.resp.take().unwrap(),
)); ));
} }
} }
@ -638,7 +683,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
info.context = Some(ctx); info.context = Some(ctx);
} }
info.error = Some(err.into()); info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, mws, self.resp)); return Ok(FinishingMiddlewares::init(info, mws, self.resp.take().unwrap()));
} }
} }
} }
@ -652,11 +697,11 @@ impl<S: 'static, H> ProcessResponse<S, H> {
Ok(_) => (), Ok(_) => (),
Err(err) => { Err(err) => {
info.error = Some(err.into()); info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, mws, self.resp)); return Ok(FinishingMiddlewares::init(info, mws, self.resp.take().unwrap()));
} }
} }
self.resp.set_response_size(io.written()); self.resp.as_mut().unwrap().set_response_size(io.written());
Ok(FinishingMiddlewares::init(info, mws, self.resp)) Ok(FinishingMiddlewares::init(info, mws, self.resp.take().unwrap()))
} }
_ => Err(PipelineState::Response(self)), _ => Err(PipelineState::Response(self)),
} }

View File

@ -102,13 +102,19 @@ where
loop { loop {
let mut not_ready = true; let mut not_ready = true;
let disconnected = self.flags.contains(Flags::DISCONNECTED);
// check in-flight connections // check in-flight connections
for item in &mut self.tasks { for item in &mut self.tasks {
// read payload // read payload
if !disconnected {
item.poll_payload(); item.poll_payload();
}
if !item.flags.contains(EntryFlags::EOF) { if !item.flags.contains(EntryFlags::EOF) {
if disconnected {
item.flags.insert(EntryFlags::EOF);
} else {
let retry = item.payload.need_read() == PayloadStatus::Read; let retry = item.payload.need_read() == PayloadStatus::Read;
loop { loop {
match item.task.poll_io(&mut item.stream) { match item.task.poll_io(&mut item.stream) {
@ -141,12 +147,14 @@ where
} }
break; break;
} }
} else if !item.flags.contains(EntryFlags::FINISHED) { }
}
if item.flags.contains(EntryFlags::EOF) && !item.flags.contains(EntryFlags::FINISHED) {
match item.task.poll_completed() { match item.task.poll_completed() {
Ok(Async::NotReady) => (), Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) => { Ok(Async::Ready(_)) => {
not_ready = false; item.flags.insert(EntryFlags::FINISHED | EntryFlags::WRITE_DONE);
item.flags.insert(EntryFlags::FINISHED);
} }
Err(err) => { Err(err) => {
item.flags.insert( item.flags.insert(
@ -161,6 +169,7 @@ where
if item.flags.contains(EntryFlags::FINISHED) if item.flags.contains(EntryFlags::FINISHED)
&& !item.flags.contains(EntryFlags::WRITE_DONE) && !item.flags.contains(EntryFlags::WRITE_DONE)
&& !disconnected
{ {
match item.stream.poll_completed(false) { match item.stream.poll_completed(false) {
Ok(Async::NotReady) => (), Ok(Async::NotReady) => (),
@ -168,7 +177,7 @@ where
not_ready = false; not_ready = false;
item.flags.insert(EntryFlags::WRITE_DONE); item.flags.insert(EntryFlags::WRITE_DONE);
} }
Err(_err) => { Err(_) => {
item.flags.insert(EntryFlags::ERROR); item.flags.insert(EntryFlags::ERROR);
} }
} }
@ -177,7 +186,7 @@ where
// cleanup finished tasks // cleanup finished tasks
while !self.tasks.is_empty() { while !self.tasks.is_empty() {
if self.tasks[0].flags.contains(EntryFlags::EOF) if self.tasks[0].flags.contains(EntryFlags::FINISHED)
&& self.tasks[0].flags.contains(EntryFlags::WRITE_DONE) && self.tasks[0].flags.contains(EntryFlags::WRITE_DONE)
|| self.tasks[0].flags.contains(EntryFlags::ERROR) || self.tasks[0].flags.contains(EntryFlags::ERROR)
{ {
@ -397,6 +406,7 @@ impl<H: HttpHandler + 'static> Entry<H> {
} }
Ok(Async::NotReady) => break, Ok(Async::NotReady) => break,
Err(err) => { Err(err) => {
println!("POLL-PAYLOAD error: {:?}", err);
self.payload.set_error(PayloadError::Http2(err)); self.payload.set_error(PayloadError::Http2(err));
break; break;
} }

View File

@ -167,7 +167,6 @@ impl<H: 'static> Writer for H2Writer<H> {
Ok(WriterState::Done) Ok(WriterState::Done)
} else { } else {
self.flags.insert(Flags::EOF); self.flags.insert(Flags::EOF);
self.written = bytes.len() as u64;
self.buffer.write(bytes.as_ref())?; self.buffer.write(bytes.as_ref())?;
if let Some(ref mut stream) = self.stream { if let Some(ref mut stream) = self.stream {
self.flags.insert(Flags::RESERVED); self.flags.insert(Flags::RESERVED);
@ -183,8 +182,6 @@ impl<H: 'static> Writer for H2Writer<H> {
} }
fn write(&mut self, payload: &Binary) -> io::Result<WriterState> { fn write(&mut self, payload: &Binary) -> io::Result<WriterState> {
self.written = payload.len() as u64;
if !self.flags.contains(Flags::DISCONNECTED) { if !self.flags.contains(Flags::DISCONNECTED) {
if self.flags.contains(Flags::STARTED) { if self.flags.contains(Flags::STARTED) {
// TODO: add warning, write after EOF // TODO: add warning, write after EOF
@ -253,7 +250,9 @@ impl<H: 'static> Writer for H2Writer<H> {
return Ok(Async::Ready(())); return Ok(Async::Ready(()));
} }
} }
Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e)), Err(e) => {
return Err(io::Error::new(io::ErrorKind::Other, e))
}
} }
} }
} }

View File

@ -15,8 +15,10 @@ use tokio::runtime::current_thread::Runtime;
#[cfg(feature = "alpn")] #[cfg(feature = "alpn")]
use openssl::ssl::SslAcceptorBuilder; use openssl::ssl::SslAcceptorBuilder;
#[cfg(all(feature = "rust-tls"))] #[cfg(feature = "rust-tls")]
use rustls::ServerConfig; use rustls::ServerConfig;
#[cfg(feature = "rust-tls")]
use server::RustlsAcceptor;
use application::{App, HttpApplication}; use application::{App, HttpApplication};
use body::Binary; use body::Binary;
@ -342,7 +344,7 @@ impl<S: 'static> TestServerBuilder<S> {
let ssl = self.rust_ssl.take(); let ssl = self.rust_ssl.take();
if let Some(ssl) = ssl { if let Some(ssl) = ssl {
let tcp = net::TcpListener::bind(addr).unwrap(); let tcp = net::TcpListener::bind(addr).unwrap();
srv = srv.listen_rustls(tcp, ssl).unwrap(); srv = srv.listen_with(tcp, RustlsAcceptor::new(ssl)).unwrap();
} }
} }
if !has_ssl { if !has_ssl {

View File

@ -210,7 +210,7 @@ impl Ws2 {
ctx.drain() ctx.drain()
.and_then(|_, act, ctx| { .and_then(|_, act, ctx| {
act.count += 1; act.count += 1;
if act.count != 10_000 { if act.count != 1_000 {
act.send(ctx); act.send(ctx);
} }
actix::fut::ok(()) actix::fut::ok(())
@ -248,7 +248,7 @@ fn test_server_send_text() {
}); });
let (mut reader, _writer) = srv.ws().unwrap(); let (mut reader, _writer) = srv.ws().unwrap();
for _ in 0..10_000 { for _ in 0..1_000 {
let (item, r) = srv.execute(reader.into_future()).unwrap(); let (item, r) = srv.execute(reader.into_future()).unwrap();
reader = r; reader = r;
assert_eq!(item, data); assert_eq!(item, data);
@ -272,7 +272,7 @@ fn test_server_send_bin() {
}); });
let (mut reader, _writer) = srv.ws().unwrap(); let (mut reader, _writer) = srv.ws().unwrap();
for _ in 0..10_000 { for _ in 0..1_000 {
let (item, r) = srv.execute(reader.into_future()).unwrap(); let (item, r) = srv.execute(reader.into_future()).unwrap();
reader = r; reader = r;
assert_eq!(item, data); assert_eq!(item, data);
@ -308,7 +308,7 @@ fn test_ws_server_ssl() {
let (mut reader, _writer) = srv.ws().unwrap(); let (mut reader, _writer) = srv.ws().unwrap();
let data = Some(ws::Message::Text("0".repeat(65_536))); let data = Some(ws::Message::Text("0".repeat(65_536)));
for _ in 0..10_000 { for _ in 0..1_000 {
let (item, r) = srv.execute(reader.into_future()).unwrap(); let (item, r) = srv.execute(reader.into_future()).unwrap();
reader = r; reader = r;
assert_eq!(item, data); assert_eq!(item, data);
@ -347,7 +347,7 @@ fn test_ws_server_rust_tls() {
let (mut reader, _writer) = srv.ws().unwrap(); let (mut reader, _writer) = srv.ws().unwrap();
let data = Some(ws::Message::Text("0".repeat(65_536))); let data = Some(ws::Message::Text("0".repeat(65_536)));
for _ in 0..10_000 { for _ in 0..1_000 {
let (item, r) = srv.execute(reader.into_future()).unwrap(); let (item, r) = srv.execute(reader.into_future()).unwrap();
reader = r; reader = r;
assert_eq!(item, data); assert_eq!(item, data);