1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-08-16 10:59:01 +02:00

Compare commits

..

10 Commits

Author SHA1 Message Date
Nikolay Kim
ab978a18ff unix only test 2018-03-03 18:50:00 -08:00
Nikolay Kim
327df159c6 prepare release 2018-03-03 18:46:22 -08:00
Nikolay Kim
2ccbd5fa18 fix socket polling 2018-03-03 12:17:26 -08:00
Nikolay Kim
058630d041 simplify channels list management 2018-03-03 11:16:55 -08:00
Nikolay Kim
f456be0309 simplify linked nodes 2018-03-03 10:06:13 -08:00
Nikolay Kim
9bd6cb03ac Merge branch 'master' of github.com:actix/actix-web 2018-03-03 09:29:46 -08:00
Nikolay Kim
16afeda79c update changes 2018-03-03 09:29:36 -08:00
Nikolay Kim
83fcdfd91f fix potential bug in payload processing 2018-03-03 09:27:54 -08:00
Nikolay Kim
8f94ae41cc Merge pull request #90 from rvlzzr/master
move reuse_address before bind
2018-03-02 23:08:33 -08:00
Anti Revoluzzer
4e41347de8 move reuse_address before bind 2018-03-02 22:57:11 -08:00
8 changed files with 125 additions and 114 deletions

View File

@@ -1,5 +1,14 @@
# Changes # Changes
## 0.4.3 (2018-03-03)
* Fix request body read bug
* Fix segmentation fault #79
* Set reuse address before bind #90
## 0.4.2 (2018-03-02) ## 0.4.2 (2018-03-02)
* Better naming for websockets implementation * Better naming for websockets implementation

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "actix-web" name = "actix-web"
version = "0.4.2" version = "0.4.3"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix web is a small, pragmatic, extremely fast, web framework for Rust." description = "Actix web is a small, pragmatic, extremely fast, web framework for Rust."
readme = "README.md" readme = "README.md"

View File

@@ -2,7 +2,7 @@ use std::mem;
use httparse; use httparse;
use http::{Version, HttpTryFrom, HeaderMap, StatusCode}; use http::{Version, HttpTryFrom, HeaderMap, StatusCode};
use http::header::{self, HeaderName, HeaderValue}; use http::header::{self, HeaderName, HeaderValue};
use bytes::{Bytes, BytesMut, BufMut}; use bytes::{Bytes, BytesMut};
use futures::{Poll, Async}; use futures::{Poll, Async};
use error::{ParseError, PayloadError}; use error::{ParseError, PayloadError};
@@ -37,7 +37,7 @@ impl HttpResponseParser {
where T: IoStream where T: IoStream
{ {
// if buf is empty parse_message will always return NotReady, let's avoid that // if buf is empty parse_message will always return NotReady, let's avoid that
let read = if buf.is_empty() { if buf.is_empty() {
match utils::read_from_io(io, buf) { match utils::read_from_io(io, buf) {
Ok(Async::Ready(0)) => Ok(Async::Ready(0)) =>
return Err(HttpResponseParserError::Disconnect), return Err(HttpResponseParserError::Disconnect),
@@ -47,13 +47,12 @@ impl HttpResponseParser {
Err(err) => Err(err) =>
return Err(HttpResponseParserError::Error(err.into())) return Err(HttpResponseParserError::Error(err.into()))
} }
false }
} else {
true
};
loop { loop {
match HttpResponseParser::parse_message(buf).map_err(HttpResponseParserError::Error)? { match HttpResponseParser::parse_message(buf)
.map_err(HttpResponseParserError::Error)?
{
Async::Ready((msg, decoder)) => { Async::Ready((msg, decoder)) => {
self.decoder = decoder; self.decoder = decoder;
return Ok(Async::Ready(msg)); return Ok(Async::Ready(msg));
@@ -62,7 +61,6 @@ impl HttpResponseParser {
if buf.capacity() >= MAX_BUFFER_SIZE { if buf.capacity() >= MAX_BUFFER_SIZE {
return Err(HttpResponseParserError::Error(ParseError::TooLarge)); return Err(HttpResponseParserError::Error(ParseError::TooLarge));
} }
if read || buf.remaining_mut() == 0 {
match utils::read_from_io(io, buf) { match utils::read_from_io(io, buf) {
Ok(Async::Ready(0)) => Ok(Async::Ready(0)) =>
return Err(HttpResponseParserError::Disconnect), return Err(HttpResponseParserError::Disconnect),
@@ -71,9 +69,6 @@ impl HttpResponseParser {
Err(err) => Err(err) =>
return Err(HttpResponseParserError::Error(err.into())), return Err(HttpResponseParserError::Error(err.into())),
} }
} else {
return Ok(Async::NotReady)
}
}, },
} }
} }
@@ -84,25 +79,34 @@ impl HttpResponseParser {
where T: IoStream where T: IoStream
{ {
if self.decoder.is_some() { if self.decoder.is_some() {
loop {
// read payload // read payload
match utils::read_from_io(io, buf) { let not_ready = match utils::read_from_io(io, buf) {
Ok(Async::Ready(0)) => { Ok(Async::Ready(0)) => {
if buf.is_empty() { if buf.is_empty() {
return Err(PayloadError::Incomplete) return Err(PayloadError::Incomplete)
} }
true
} }
Err(err) => return Err(err.into()), Err(err) => return Err(err.into()),
_ => (), Ok(Async::NotReady) => true,
} _ => false,
};
match self.decoder.as_mut().unwrap().decode(buf) { match self.decoder.as_mut().unwrap().decode(buf) {
Ok(Async::Ready(Some(b))) => Ok(Async::Ready(Some(b))), Ok(Async::Ready(Some(b))) =>
return Ok(Async::Ready(Some(b))),
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
self.decoder.take(); self.decoder.take();
Ok(Async::Ready(None)) return Ok(Async::Ready(None))
}
Ok(Async::NotReady) => {
if not_ready {
return Ok(Async::NotReady)
}
}
Err(err) => return Err(err.into()),
} }
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Err(err.into()),
} }
} else { } else {
Ok(Async::Ready(None)) Ok(Async::Ready(None))

View File

@@ -18,15 +18,6 @@ enum HttpProtocol<T: IoStream, H: 'static> {
Unknown(Rc<WorkerSettings<H>>, Option<SocketAddr>, T, BytesMut), Unknown(Rc<WorkerSettings<H>>, Option<SocketAddr>, T, BytesMut),
} }
impl<T: IoStream, H: 'static> HttpProtocol<T, H> {
fn is_unknown(&self) -> bool {
match *self {
HttpProtocol::Unknown(_, _, _, _) => true,
_ => false
}
}
}
enum ProtocolKind { enum ProtocolKind {
Http1, Http1,
Http2, Http2,
@@ -44,15 +35,14 @@ impl<T, H> HttpChannel<T, H> where T: IoStream, H: HttpHandler + 'static
io: T, peer: Option<SocketAddr>, http2: bool) -> HttpChannel<T, H> io: T, peer: Option<SocketAddr>, http2: bool) -> HttpChannel<T, H>
{ {
settings.add_channel(); settings.add_channel();
if http2 { if http2 {
HttpChannel { HttpChannel {
node: None, node: None, proto: Some(HttpProtocol::H2(
proto: Some(HttpProtocol::H2(
h2::Http2::new(settings, io, peer, Bytes::new()))) } h2::Http2::new(settings, io, peer, Bytes::new()))) }
} else { } else {
HttpChannel { HttpChannel {
node: None, node: None, proto: Some(HttpProtocol::Unknown(
proto: Some(HttpProtocol::Unknown(
settings, peer, io, BytesMut::with_capacity(4096))) } settings, peer, io, BytesMut::with_capacity(4096))) }
} }
} }
@@ -78,15 +68,18 @@ impl<T, H> Future for HttpChannel<T, H> where T: IoStream, H: HttpHandler + 'sta
type Error = (); type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if !self.proto.as_ref().map(|p| p.is_unknown()).unwrap_or(false) && self.node.is_none() { if !self.node.is_none() {
self.node = Some(Node::new(self)); let el = self as *mut _;
match self.proto { self.node = Some(Node::new(el));
let _ = match self.proto {
Some(HttpProtocol::H1(ref mut h1)) => Some(HttpProtocol::H1(ref mut h1)) =>
h1.settings().head().insert(self.node.as_ref().unwrap()), self.node.as_ref().map(|n| h1.settings().head().insert(n)),
Some(HttpProtocol::H2(ref mut h2)) => Some(HttpProtocol::H2(ref mut h2)) =>
h2.settings().head().insert(self.node.as_ref().unwrap()), self.node.as_ref().map(|n| h2.settings().head().insert(n)),
_ => (), Some(HttpProtocol::Unknown(ref mut settings, _, _, _)) =>
} self.node.as_ref().map(|n| settings.head().insert(n)),
None => unreachable!(),
};
} }
let kind = match self.proto { let kind = match self.proto {
@@ -95,7 +88,7 @@ impl<T, H> Future for HttpChannel<T, H> where T: IoStream, H: HttpHandler + 'sta
match result { match result {
Ok(Async::Ready(())) | Err(_) => { Ok(Async::Ready(())) | Err(_) => {
h1.settings().remove_channel(); h1.settings().remove_channel();
self.node.as_ref().unwrap().remove(); self.node.as_mut().map(|n| n.remove());
}, },
_ => (), _ => (),
} }
@@ -106,7 +99,7 @@ impl<T, H> Future for HttpChannel<T, H> where T: IoStream, H: HttpHandler + 'sta
match result { match result {
Ok(Async::Ready(())) | Err(_) => { Ok(Async::Ready(())) | Err(_) => {
h2.settings().remove_channel(); h2.settings().remove_channel();
self.node.as_ref().unwrap().remove(); self.node.as_mut().map(|n| n.remove());
}, },
_ => (), _ => (),
} }
@@ -117,6 +110,7 @@ impl<T, H> Future for HttpChannel<T, H> where T: IoStream, H: HttpHandler + 'sta
Ok(Async::Ready(0)) | Err(_) => { Ok(Async::Ready(0)) | Err(_) => {
debug!("Ignored premature client disconnection"); debug!("Ignored premature client disconnection");
settings.remove_channel(); settings.remove_channel();
self.node.as_mut().map(|n| n.remove());
return Err(()) return Err(())
}, },
_ => (), _ => (),
@@ -163,11 +157,11 @@ pub(crate) struct Node<T>
impl<T> Node<T> impl<T> Node<T>
{ {
fn new(el: &mut T) -> Self { fn new(el: *mut T) -> Self {
Node { Node {
next: None, next: None,
prev: None, prev: None,
element: el as *mut _, element: el,
} }
} }
@@ -186,13 +180,14 @@ impl<T> Node<T>
} }
} }
fn remove(&self) { fn remove(&mut self) {
#[allow(mutable_transmutes)]
unsafe { unsafe {
if let Some(ref prev) = self.prev { self.element = ptr::null_mut();
let p: &mut Node<()> = mem::transmute(prev.as_ref().unwrap()); let next = self.next.take();
let slf: &mut Node<T> = mem::transmute(self); let mut prev = self.prev.take();
p.next = slf.next.take();
if let Some(ref mut prev) = prev {
prev.as_mut().unwrap().next = next;
} }
} }
} }

View File

@@ -10,7 +10,7 @@ use actix::Arbiter;
use httparse; use httparse;
use http::{Uri, Method, Version, HttpTryFrom, HeaderMap}; use http::{Uri, Method, Version, HttpTryFrom, HeaderMap};
use http::header::{self, HeaderName, HeaderValue}; use http::header::{self, HeaderName, HeaderValue};
use bytes::{Bytes, BytesMut, BufMut}; use bytes::{Bytes, BytesMut};
use futures::{Future, Poll, Async}; use futures::{Future, Poll, Async};
use tokio_core::reactor::Timeout; use tokio_core::reactor::Timeout;
@@ -402,42 +402,51 @@ impl Reader {
// read payload // read payload
let done = { let done = {
if let Some(ref mut payload) = self.payload { if let Some(ref mut payload) = self.payload {
match utils::read_from_io(io, buf) { 'buf: loop {
let not_ready = match utils::read_from_io(io, buf) {
Ok(Async::Ready(0)) => { Ok(Async::Ready(0)) => {
payload.tx.set_error(PayloadError::Incomplete); payload.tx.set_error(PayloadError::Incomplete);
// http channel should not deal with payload errors // http channel should not deal with payload errors
return Err(ReaderError::Payload) return Err(ReaderError::Payload)
}, },
Ok(Async::NotReady) => true,
Err(err) => { Err(err) => {
payload.tx.set_error(err.into()); payload.tx.set_error(err.into());
// http channel should not deal with payload errors // http channel should not deal with payload errors
return Err(ReaderError::Payload) return Err(ReaderError::Payload)
} }
_ => (), _ => false,
} };
loop { loop {
match payload.decoder.decode(buf) { match payload.decoder.decode(buf) {
Ok(Async::Ready(Some(bytes))) => { Ok(Async::Ready(Some(bytes))) => {
payload.tx.feed_data(bytes); payload.tx.feed_data(bytes);
if payload.decoder.is_eof() { if payload.decoder.is_eof() {
payload.tx.feed_eof(); payload.tx.feed_eof();
break true break 'buf true
} }
}, },
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
payload.tx.feed_eof(); payload.tx.feed_eof();
break true break 'buf true
},
Ok(Async::NotReady) => {
// if buffer is full then
// socket still can contain more data
if not_ready {
return Ok(Async::NotReady)
}
continue 'buf
}, },
Ok(Async::NotReady) =>
return Ok(Async::NotReady),
Err(err) => { Err(err) => {
payload.tx.set_error(err.into()); payload.tx.set_error(err.into());
return Err(ReaderError::Payload) return Err(ReaderError::Payload)
} }
} }
} }
}
} else { } else {
false false
} }
@@ -445,16 +454,13 @@ impl Reader {
if done { self.payload = None } if done { self.payload = None }
// if buf is empty parse_message will always return NotReady, let's avoid that // if buf is empty parse_message will always return NotReady, let's avoid that
let read = if buf.is_empty() { if buf.is_empty() {
match utils::read_from_io(io, buf) { match utils::read_from_io(io, buf) {
Ok(Async::Ready(0)) => return Err(ReaderError::Disconnect), Ok(Async::Ready(0)) => return Err(ReaderError::Disconnect),
Ok(Async::Ready(_)) => (), Ok(Async::Ready(_)) => (),
Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => return Err(ReaderError::Error(err.into())) Err(err) => return Err(ReaderError::Error(err.into()))
} }
false
} else {
true
}; };
loop { loop {
@@ -470,11 +476,10 @@ impl Reader {
return Ok(Async::Ready(msg)); return Ok(Async::Ready(msg));
}, },
Async::NotReady => { Async::NotReady => {
if buf.capacity() >= MAX_BUFFER_SIZE { if buf.len() >= MAX_BUFFER_SIZE {
error!("MAX_BUFFER_SIZE unprocessed data reached, closing"); error!("MAX_BUFFER_SIZE unprocessed data reached, closing");
return Err(ReaderError::Error(ParseError::TooLarge)); return Err(ReaderError::Error(ParseError::TooLarge));
} }
if read || buf.remaining_mut() == 0 {
match utils::read_from_io(io, buf) { match utils::read_from_io(io, buf) {
Ok(Async::Ready(0)) => { Ok(Async::Ready(0)) => {
debug!("Ignored premature client disconnection"); debug!("Ignored premature client disconnection");
@@ -484,9 +489,6 @@ impl Reader {
Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => return Err(ReaderError::Error(err.into())), Err(err) => return Err(ReaderError::Error(err.into())),
} }
} else {
return Ok(Async::NotReady)
}
}, },
} }
} }

View File

@@ -63,7 +63,7 @@ pub(crate) struct WorkerSettings<H> {
bytes: Rc<SharedBytesPool>, bytes: Rc<SharedBytesPool>,
messages: Rc<helpers::SharedMessagePool>, messages: Rc<helpers::SharedMessagePool>,
channels: Cell<usize>, channels: Cell<usize>,
node: Node<()>, node: Box<Node<()>>,
} }
impl<H> WorkerSettings<H> { impl<H> WorkerSettings<H> {
@@ -75,7 +75,7 @@ impl<H> WorkerSettings<H> {
bytes: Rc::new(SharedBytesPool::new()), bytes: Rc::new(SharedBytesPool::new()),
messages: Rc::new(helpers::SharedMessagePool::new()), messages: Rc::new(helpers::SharedMessagePool::new()),
channels: Cell::new(0), channels: Cell::new(0),
node: Node::head(), node: Box::new(Node::head()),
} }
} }

View File

@@ -697,7 +697,7 @@ fn create_tcp_listener(addr: net::SocketAddr, backlog: i32) -> io::Result<net::T
net::SocketAddr::V4(_) => TcpBuilder::new_v4()?, net::SocketAddr::V4(_) => TcpBuilder::new_v4()?,
net::SocketAddr::V6(_) => TcpBuilder::new_v6()?, net::SocketAddr::V6(_) => TcpBuilder::new_v6()?,
}; };
builder.bind(addr)?;
builder.reuse_address(true)?; builder.reuse_address(true)?;
builder.bind(addr)?;
Ok(builder.listen(backlog)?) Ok(builder.listen(backlog)?)
} }

View File

@@ -94,6 +94,7 @@ fn test_start() {
} }
#[test] #[test]
#[cfg(unix)]
fn test_shutdown() { fn test_shutdown() {
let _ = test::TestServer::unused_addr(); let _ = test::TestServer::unused_addr();
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();