1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 09:42:40 +01:00

h1 decoder blocks on error #222

This commit is contained in:
Nikolay Kim 2018-05-15 07:55:36 -07:00
parent d65a03f6ac
commit 5ea2d68438

View File

@ -67,7 +67,9 @@ where
H: HttpHandler + 'static, H: HttpHandler + 'static,
{ {
pub fn new( pub fn new(
settings: Rc<WorkerSettings<H>>, stream: T, addr: Option<SocketAddr>, settings: Rc<WorkerSettings<H>>,
stream: T,
addr: Option<SocketAddr>,
buf: BytesMut, buf: BytesMut,
) -> Self { ) -> Self {
let bytes = settings.get_shared_bytes(); let bytes = settings.get_shared_bytes();
@ -149,7 +151,8 @@ where
pub fn poll_io(&mut self) { pub fn poll_io(&mut self) {
// read io from socket // read io from socket
if !self.flags.intersects(Flags::ERROR) if !self.flags.intersects(Flags::ERROR)
&& self.tasks.len() < MAX_PIPELINED_MESSAGES && self.can_read() && self.tasks.len() < MAX_PIPELINED_MESSAGES
&& self.can_read()
{ {
if self.read() { if self.read() {
// notify all tasks // notify all tasks
@ -205,8 +208,7 @@ where
self.stream.reset(); self.stream.reset();
if ready { if ready {
item.flags item.flags.insert(EntryFlags::EOF | EntryFlags::FINISHED);
.insert(EntryFlags::EOF | EntryFlags::FINISHED);
} else { } else {
item.flags.insert(EntryFlags::FINISHED); item.flags.insert(EntryFlags::FINISHED);
} }
@ -347,6 +349,7 @@ where
} else { } else {
error!("Internal server error: unexpected payload chunk"); error!("Internal server error: unexpected payload chunk");
self.flags.insert(Flags::ERROR); self.flags.insert(Flags::ERROR);
break;
} }
} }
Ok(Some(Message::Eof)) => { Ok(Some(Message::Eof)) => {
@ -355,6 +358,7 @@ where
} else { } else {
error!("Internal server error: unexpected eof"); error!("Internal server error: unexpected eof");
self.flags.insert(Flags::ERROR); self.flags.insert(Flags::ERROR);
break;
} }
} }
Ok(None) => break, Ok(None) => break,
@ -367,6 +371,7 @@ where
}; };
payload.set_error(e); payload.set_error(e);
} }
break;
} }
} }
} }
@ -614,10 +619,7 @@ mod tests {
assert_eq!(req.version(), Version::HTTP_11); assert_eq!(req.version(), Version::HTTP_11);
assert_eq!(*req.method(), Method::GET); assert_eq!(*req.method(), Method::GET);
assert_eq!(req.path(), "/test"); assert_eq!(req.path(), "/test");
assert_eq!( assert_eq!(req.headers().get("test").unwrap().as_bytes(), b"value");
req.headers().get("test").unwrap().as_bytes(),
b"value"
);
} }
Ok(_) | Err(_) => unreachable!("Error during parsing http request"), Ok(_) | Err(_) => unreachable!("Error during parsing http request"),
} }
@ -918,13 +920,7 @@ mod tests {
.as_ref(), .as_ref(),
b"line" b"line"
); );
assert!( assert!(reader.decode(&mut buf, &settings).unwrap().unwrap().eof());
reader
.decode(&mut buf, &settings)
.unwrap()
.unwrap()
.eof()
);
} }
#[test] #[test]
@ -1005,13 +1001,7 @@ mod tests {
assert!(reader.decode(&mut buf, &settings).unwrap().is_none()); assert!(reader.decode(&mut buf, &settings).unwrap().is_none());
buf.extend(b"\r\n"); buf.extend(b"\r\n");
assert!( assert!(reader.decode(&mut buf, &settings).unwrap().unwrap().eof());
reader
.decode(&mut buf, &settings)
.unwrap()
.unwrap()
.eof()
);
} }
#[test] #[test]
@ -1029,17 +1019,9 @@ mod tests {
assert!(req.chunked().unwrap()); assert!(req.chunked().unwrap());
buf.extend(b"4;test\r\ndata\r\n4\r\nline\r\n0\r\n\r\n"); // test: test\r\n\r\n") buf.extend(b"4;test\r\ndata\r\n4\r\nline\r\n0\r\n\r\n"); // test: test\r\n\r\n")
let chunk = reader let chunk = reader.decode(&mut buf, &settings).unwrap().unwrap().chunk();
.decode(&mut buf, &settings)
.unwrap()
.unwrap()
.chunk();
assert_eq!(chunk, Bytes::from_static(b"data")); assert_eq!(chunk, Bytes::from_static(b"data"));
let chunk = reader let chunk = reader.decode(&mut buf, &settings).unwrap().unwrap().chunk();
.decode(&mut buf, &settings)
.unwrap()
.unwrap()
.chunk();
assert_eq!(chunk, Bytes::from_static(b"line")); assert_eq!(chunk, Bytes::from_static(b"line"));
let msg = reader.decode(&mut buf, &settings).unwrap().unwrap(); let msg = reader.decode(&mut buf, &settings).unwrap().unwrap();
assert!(msg.eof()); assert!(msg.eof());