1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 16:02:59 +01:00

better error handling

This commit is contained in:
Nikolay Kim 2018-05-17 10:58:08 -07:00
parent 8de1f60347
commit f3ece74406
3 changed files with 34 additions and 31 deletions

View File

@ -38,7 +38,9 @@ where
H: HttpHandler + 'static, H: HttpHandler + 'static,
{ {
pub(crate) fn new( pub(crate) fn new(
settings: Rc<WorkerSettings<H>>, mut io: T, peer: Option<SocketAddr>, settings: Rc<WorkerSettings<H>>,
mut io: T,
peer: Option<SocketAddr>,
http2: bool, http2: bool,
) -> HttpChannel<T, H> { ) -> HttpChannel<T, H> {
settings.add_channel(); settings.add_channel();
@ -61,7 +63,7 @@ where
settings, settings,
peer, peer,
io, io,
BytesMut::with_capacity(4096), BytesMut::with_capacity(8192),
)), )),
} }
} }
@ -93,12 +95,12 @@ where
let el = self as *mut _; let el = self as *mut _;
self.node = Some(Node::new(el)); self.node = Some(Node::new(el));
let _ = match self.proto { let _ = match self.proto {
Some(HttpProtocol::H1(ref mut h1)) => self.node Some(HttpProtocol::H1(ref mut h1)) => {
.as_ref() self.node.as_ref().map(|n| h1.settings().head().insert(n))
.map(|n| h1.settings().head().insert(n)), }
Some(HttpProtocol::H2(ref mut h2)) => self.node Some(HttpProtocol::H2(ref mut h2)) => {
.as_ref() self.node.as_ref().map(|n| h2.settings().head().insert(n))
.map(|n| h2.settings().head().insert(n)), }
Some(HttpProtocol::Unknown(ref mut settings, _, _, _)) => { Some(HttpProtocol::Unknown(ref mut settings, _, _, _)) => {
self.node.as_ref().map(|n| settings.head().insert(n)) self.node.as_ref().map(|n| settings.head().insert(n))
} }
@ -168,9 +170,8 @@ where
if let Some(HttpProtocol::Unknown(settings, addr, io, buf)) = self.proto.take() { if let Some(HttpProtocol::Unknown(settings, addr, io, buf)) = self.proto.take() {
match kind { match kind {
ProtocolKind::Http1 => { ProtocolKind::Http1 => {
self.proto = Some(HttpProtocol::H1(h1::Http1::new( self.proto =
settings, io, addr, buf, Some(HttpProtocol::H1(h1::Http1::new(settings, io, addr, buf)));
)));
return self.poll(); return self.poll();
} }
ProtocolKind::Http2 => { ProtocolKind::Http2 => {

View File

@ -162,7 +162,6 @@ where
entry.pipe.disconnected() entry.pipe.disconnected()
} }
// kill keepalive // kill keepalive
self.flags.remove(Flags::KEEPALIVE);
self.keepalive_timer.take(); self.keepalive_timer.take();
// on parse error, stop reading stream but tasks need to be // on parse error, stop reading stream but tasks need to be
@ -352,7 +351,7 @@ where
Ok(Async::NotReady) => {} Ok(Async::NotReady) => {}
Err(err) => { Err(err) => {
error!("Unhandled error: {}", err); error!("Unhandled error: {}", err);
self.flags.intersects(Flags::ERROR); self.flags.insert(Flags::ERROR);
return; return;
} }
} }

View File

@ -46,7 +46,9 @@ impl H1Decoder {
} }
pub fn decode<H>( pub fn decode<H>(
&mut self, src: &mut BytesMut, settings: &WorkerSettings<H>, &mut self,
src: &mut BytesMut,
settings: &WorkerSettings<H>,
) -> Result<Option<Message>, DecoderError> { ) -> Result<Option<Message>, DecoderError> {
// read payload // read payload
if self.decoder.is_some() { if self.decoder.is_some() {
@ -64,18 +66,11 @@ impl H1Decoder {
.map_err(DecoderError::Error)? .map_err(DecoderError::Error)?
{ {
Async::Ready((msg, decoder)) => { Async::Ready((msg, decoder)) => {
if let Some(decoder) = decoder { self.decoder = decoder;
self.decoder = Some(decoder); Ok(Some(Message::Message {
Ok(Some(Message::Message { msg,
msg, payload: self.decoder.is_some(),
payload: true, }))
}))
} else {
Ok(Some(Message::Message {
msg,
payload: false,
}))
}
} }
Async::NotReady => { Async::NotReady => {
if src.len() >= MAX_BUFFER_SIZE { if src.len() >= MAX_BUFFER_SIZE {
@ -89,7 +84,9 @@ impl H1Decoder {
} }
fn parse_message<H>( fn parse_message<H>(
&self, buf: &mut BytesMut, settings: &WorkerSettings<H>, &self,
buf: &mut BytesMut,
settings: &WorkerSettings<H>,
) -> Poll<(SharedHttpInnerMessage, Option<EncodingDecoder>), ParseError> { ) -> Poll<(SharedHttpInnerMessage, Option<EncodingDecoder>), ParseError> {
// Parse http message // Parse http message
let mut has_upgrade = false; let mut has_upgrade = false;
@ -148,7 +145,7 @@ impl H1Decoder {
header::CONTENT_LENGTH => { header::CONTENT_LENGTH => {
if let Ok(s) = value.to_str() { if let Ok(s) = value.to_str() {
if let Ok(len) = s.parse::<u64>() { if let Ok(len) = s.parse::<u64>() {
content_length = Some(len) content_length = Some(len);
} else { } else {
debug!("illegal Content-Length: {:?}", len); debug!("illegal Content-Length: {:?}", len);
return Err(ParseError::Header); return Err(ParseError::Header);
@ -351,7 +348,10 @@ macro_rules! byte (
impl ChunkedState { impl ChunkedState {
fn step( fn step(
&self, body: &mut BytesMut, size: &mut u64, buf: &mut Option<Bytes>, &self,
body: &mut BytesMut,
size: &mut u64,
buf: &mut Option<Bytes>,
) -> Poll<ChunkedState, io::Error> { ) -> Poll<ChunkedState, io::Error> {
use self::ChunkedState::*; use self::ChunkedState::*;
match *self { match *self {
@ -414,7 +414,8 @@ impl ChunkedState {
} }
} }
fn read_size_lf( fn read_size_lf(
rdr: &mut BytesMut, size: &mut u64, rdr: &mut BytesMut,
size: &mut u64,
) -> Poll<ChunkedState, io::Error> { ) -> Poll<ChunkedState, io::Error> {
match byte!(rdr) { match byte!(rdr) {
b'\n' if *size > 0 => Ok(Async::Ready(ChunkedState::Body)), b'\n' if *size > 0 => Ok(Async::Ready(ChunkedState::Body)),
@ -427,7 +428,9 @@ impl ChunkedState {
} }
fn read_body( fn read_body(
rdr: &mut BytesMut, rem: &mut u64, buf: &mut Option<Bytes>, rdr: &mut BytesMut,
rem: &mut u64,
buf: &mut Option<Bytes>,
) -> Poll<ChunkedState, io::Error> { ) -> Poll<ChunkedState, io::Error> {
trace!("Chunked read, remaining={:?}", rem); trace!("Chunked read, remaining={:?}", rem);