mirror of
https://github.com/fafhrd91/actix-web
synced 2025-08-16 10:59:01 +02:00
Compare commits
10 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
ab978a18ff | ||
|
327df159c6 | ||
|
2ccbd5fa18 | ||
|
058630d041 | ||
|
f456be0309 | ||
|
9bd6cb03ac | ||
|
16afeda79c | ||
|
83fcdfd91f | ||
|
8f94ae41cc | ||
|
4e41347de8 |
@@ -1,5 +1,14 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## 0.4.3 (2018-03-03)
|
||||||
|
|
||||||
|
* Fix request body read bug
|
||||||
|
|
||||||
|
* Fix segmentation fault #79
|
||||||
|
|
||||||
|
* Set reuse address before bind #90
|
||||||
|
|
||||||
|
|
||||||
## 0.4.2 (2018-03-02)
|
## 0.4.2 (2018-03-02)
|
||||||
|
|
||||||
* Better naming for websockets implementation
|
* Better naming for websockets implementation
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-web"
|
name = "actix-web"
|
||||||
version = "0.4.2"
|
version = "0.4.3"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Actix web is a small, pragmatic, extremely fast, web framework for Rust."
|
description = "Actix web is a small, pragmatic, extremely fast, web framework for Rust."
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
|
@@ -2,7 +2,7 @@ use std::mem;
|
|||||||
use httparse;
|
use httparse;
|
||||||
use http::{Version, HttpTryFrom, HeaderMap, StatusCode};
|
use http::{Version, HttpTryFrom, HeaderMap, StatusCode};
|
||||||
use http::header::{self, HeaderName, HeaderValue};
|
use http::header::{self, HeaderName, HeaderValue};
|
||||||
use bytes::{Bytes, BytesMut, BufMut};
|
use bytes::{Bytes, BytesMut};
|
||||||
use futures::{Poll, Async};
|
use futures::{Poll, Async};
|
||||||
|
|
||||||
use error::{ParseError, PayloadError};
|
use error::{ParseError, PayloadError};
|
||||||
@@ -37,7 +37,7 @@ impl HttpResponseParser {
|
|||||||
where T: IoStream
|
where T: IoStream
|
||||||
{
|
{
|
||||||
// if buf is empty parse_message will always return NotReady, let's avoid that
|
// if buf is empty parse_message will always return NotReady, let's avoid that
|
||||||
let read = if buf.is_empty() {
|
if buf.is_empty() {
|
||||||
match utils::read_from_io(io, buf) {
|
match utils::read_from_io(io, buf) {
|
||||||
Ok(Async::Ready(0)) =>
|
Ok(Async::Ready(0)) =>
|
||||||
return Err(HttpResponseParserError::Disconnect),
|
return Err(HttpResponseParserError::Disconnect),
|
||||||
@@ -47,13 +47,12 @@ impl HttpResponseParser {
|
|||||||
Err(err) =>
|
Err(err) =>
|
||||||
return Err(HttpResponseParserError::Error(err.into()))
|
return Err(HttpResponseParserError::Error(err.into()))
|
||||||
}
|
}
|
||||||
false
|
}
|
||||||
} else {
|
|
||||||
true
|
|
||||||
};
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match HttpResponseParser::parse_message(buf).map_err(HttpResponseParserError::Error)? {
|
match HttpResponseParser::parse_message(buf)
|
||||||
|
.map_err(HttpResponseParserError::Error)?
|
||||||
|
{
|
||||||
Async::Ready((msg, decoder)) => {
|
Async::Ready((msg, decoder)) => {
|
||||||
self.decoder = decoder;
|
self.decoder = decoder;
|
||||||
return Ok(Async::Ready(msg));
|
return Ok(Async::Ready(msg));
|
||||||
@@ -62,7 +61,6 @@ impl HttpResponseParser {
|
|||||||
if buf.capacity() >= MAX_BUFFER_SIZE {
|
if buf.capacity() >= MAX_BUFFER_SIZE {
|
||||||
return Err(HttpResponseParserError::Error(ParseError::TooLarge));
|
return Err(HttpResponseParserError::Error(ParseError::TooLarge));
|
||||||
}
|
}
|
||||||
if read || buf.remaining_mut() == 0 {
|
|
||||||
match utils::read_from_io(io, buf) {
|
match utils::read_from_io(io, buf) {
|
||||||
Ok(Async::Ready(0)) =>
|
Ok(Async::Ready(0)) =>
|
||||||
return Err(HttpResponseParserError::Disconnect),
|
return Err(HttpResponseParserError::Disconnect),
|
||||||
@@ -71,9 +69,6 @@ impl HttpResponseParser {
|
|||||||
Err(err) =>
|
Err(err) =>
|
||||||
return Err(HttpResponseParserError::Error(err.into())),
|
return Err(HttpResponseParserError::Error(err.into())),
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
return Ok(Async::NotReady)
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -84,25 +79,34 @@ impl HttpResponseParser {
|
|||||||
where T: IoStream
|
where T: IoStream
|
||||||
{
|
{
|
||||||
if self.decoder.is_some() {
|
if self.decoder.is_some() {
|
||||||
|
loop {
|
||||||
// read payload
|
// read payload
|
||||||
match utils::read_from_io(io, buf) {
|
let not_ready = match utils::read_from_io(io, buf) {
|
||||||
Ok(Async::Ready(0)) => {
|
Ok(Async::Ready(0)) => {
|
||||||
if buf.is_empty() {
|
if buf.is_empty() {
|
||||||
return Err(PayloadError::Incomplete)
|
return Err(PayloadError::Incomplete)
|
||||||
}
|
}
|
||||||
|
true
|
||||||
}
|
}
|
||||||
Err(err) => return Err(err.into()),
|
Err(err) => return Err(err.into()),
|
||||||
_ => (),
|
Ok(Async::NotReady) => true,
|
||||||
}
|
_ => false,
|
||||||
|
};
|
||||||
|
|
||||||
match self.decoder.as_mut().unwrap().decode(buf) {
|
match self.decoder.as_mut().unwrap().decode(buf) {
|
||||||
Ok(Async::Ready(Some(b))) => Ok(Async::Ready(Some(b))),
|
Ok(Async::Ready(Some(b))) =>
|
||||||
|
return Ok(Async::Ready(Some(b))),
|
||||||
Ok(Async::Ready(None)) => {
|
Ok(Async::Ready(None)) => {
|
||||||
self.decoder.take();
|
self.decoder.take();
|
||||||
Ok(Async::Ready(None))
|
return Ok(Async::Ready(None))
|
||||||
|
}
|
||||||
|
Ok(Async::NotReady) => {
|
||||||
|
if not_ready {
|
||||||
|
return Ok(Async::NotReady)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => return Err(err.into()),
|
||||||
}
|
}
|
||||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
|
||||||
Err(err) => Err(err.into()),
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
Ok(Async::Ready(None))
|
Ok(Async::Ready(None))
|
||||||
|
@@ -18,15 +18,6 @@ enum HttpProtocol<T: IoStream, H: 'static> {
|
|||||||
Unknown(Rc<WorkerSettings<H>>, Option<SocketAddr>, T, BytesMut),
|
Unknown(Rc<WorkerSettings<H>>, Option<SocketAddr>, T, BytesMut),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: IoStream, H: 'static> HttpProtocol<T, H> {
|
|
||||||
fn is_unknown(&self) -> bool {
|
|
||||||
match *self {
|
|
||||||
HttpProtocol::Unknown(_, _, _, _) => true,
|
|
||||||
_ => false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
enum ProtocolKind {
|
enum ProtocolKind {
|
||||||
Http1,
|
Http1,
|
||||||
Http2,
|
Http2,
|
||||||
@@ -44,15 +35,14 @@ impl<T, H> HttpChannel<T, H> where T: IoStream, H: HttpHandler + 'static
|
|||||||
io: T, peer: Option<SocketAddr>, http2: bool) -> HttpChannel<T, H>
|
io: T, peer: Option<SocketAddr>, http2: bool) -> HttpChannel<T, H>
|
||||||
{
|
{
|
||||||
settings.add_channel();
|
settings.add_channel();
|
||||||
|
|
||||||
if http2 {
|
if http2 {
|
||||||
HttpChannel {
|
HttpChannel {
|
||||||
node: None,
|
node: None, proto: Some(HttpProtocol::H2(
|
||||||
proto: Some(HttpProtocol::H2(
|
|
||||||
h2::Http2::new(settings, io, peer, Bytes::new()))) }
|
h2::Http2::new(settings, io, peer, Bytes::new()))) }
|
||||||
} else {
|
} else {
|
||||||
HttpChannel {
|
HttpChannel {
|
||||||
node: None,
|
node: None, proto: Some(HttpProtocol::Unknown(
|
||||||
proto: Some(HttpProtocol::Unknown(
|
|
||||||
settings, peer, io, BytesMut::with_capacity(4096))) }
|
settings, peer, io, BytesMut::with_capacity(4096))) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -78,15 +68,18 @@ impl<T, H> Future for HttpChannel<T, H> where T: IoStream, H: HttpHandler + 'sta
|
|||||||
type Error = ();
|
type Error = ();
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
if !self.proto.as_ref().map(|p| p.is_unknown()).unwrap_or(false) && self.node.is_none() {
|
if !self.node.is_none() {
|
||||||
self.node = Some(Node::new(self));
|
let el = self as *mut _;
|
||||||
match self.proto {
|
self.node = Some(Node::new(el));
|
||||||
|
let _ = match self.proto {
|
||||||
Some(HttpProtocol::H1(ref mut h1)) =>
|
Some(HttpProtocol::H1(ref mut h1)) =>
|
||||||
h1.settings().head().insert(self.node.as_ref().unwrap()),
|
self.node.as_ref().map(|n| h1.settings().head().insert(n)),
|
||||||
Some(HttpProtocol::H2(ref mut h2)) =>
|
Some(HttpProtocol::H2(ref mut h2)) =>
|
||||||
h2.settings().head().insert(self.node.as_ref().unwrap()),
|
self.node.as_ref().map(|n| h2.settings().head().insert(n)),
|
||||||
_ => (),
|
Some(HttpProtocol::Unknown(ref mut settings, _, _, _)) =>
|
||||||
}
|
self.node.as_ref().map(|n| settings.head().insert(n)),
|
||||||
|
None => unreachable!(),
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
let kind = match self.proto {
|
let kind = match self.proto {
|
||||||
@@ -95,7 +88,7 @@ impl<T, H> Future for HttpChannel<T, H> where T: IoStream, H: HttpHandler + 'sta
|
|||||||
match result {
|
match result {
|
||||||
Ok(Async::Ready(())) | Err(_) => {
|
Ok(Async::Ready(())) | Err(_) => {
|
||||||
h1.settings().remove_channel();
|
h1.settings().remove_channel();
|
||||||
self.node.as_ref().unwrap().remove();
|
self.node.as_mut().map(|n| n.remove());
|
||||||
},
|
},
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
@@ -106,7 +99,7 @@ impl<T, H> Future for HttpChannel<T, H> where T: IoStream, H: HttpHandler + 'sta
|
|||||||
match result {
|
match result {
|
||||||
Ok(Async::Ready(())) | Err(_) => {
|
Ok(Async::Ready(())) | Err(_) => {
|
||||||
h2.settings().remove_channel();
|
h2.settings().remove_channel();
|
||||||
self.node.as_ref().unwrap().remove();
|
self.node.as_mut().map(|n| n.remove());
|
||||||
},
|
},
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
@@ -117,6 +110,7 @@ impl<T, H> Future for HttpChannel<T, H> where T: IoStream, H: HttpHandler + 'sta
|
|||||||
Ok(Async::Ready(0)) | Err(_) => {
|
Ok(Async::Ready(0)) | Err(_) => {
|
||||||
debug!("Ignored premature client disconnection");
|
debug!("Ignored premature client disconnection");
|
||||||
settings.remove_channel();
|
settings.remove_channel();
|
||||||
|
self.node.as_mut().map(|n| n.remove());
|
||||||
return Err(())
|
return Err(())
|
||||||
},
|
},
|
||||||
_ => (),
|
_ => (),
|
||||||
@@ -163,11 +157,11 @@ pub(crate) struct Node<T>
|
|||||||
|
|
||||||
impl<T> Node<T>
|
impl<T> Node<T>
|
||||||
{
|
{
|
||||||
fn new(el: &mut T) -> Self {
|
fn new(el: *mut T) -> Self {
|
||||||
Node {
|
Node {
|
||||||
next: None,
|
next: None,
|
||||||
prev: None,
|
prev: None,
|
||||||
element: el as *mut _,
|
element: el,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -186,13 +180,14 @@ impl<T> Node<T>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn remove(&self) {
|
fn remove(&mut self) {
|
||||||
#[allow(mutable_transmutes)]
|
|
||||||
unsafe {
|
unsafe {
|
||||||
if let Some(ref prev) = self.prev {
|
self.element = ptr::null_mut();
|
||||||
let p: &mut Node<()> = mem::transmute(prev.as_ref().unwrap());
|
let next = self.next.take();
|
||||||
let slf: &mut Node<T> = mem::transmute(self);
|
let mut prev = self.prev.take();
|
||||||
p.next = slf.next.take();
|
|
||||||
|
if let Some(ref mut prev) = prev {
|
||||||
|
prev.as_mut().unwrap().next = next;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -10,7 +10,7 @@ use actix::Arbiter;
|
|||||||
use httparse;
|
use httparse;
|
||||||
use http::{Uri, Method, Version, HttpTryFrom, HeaderMap};
|
use http::{Uri, Method, Version, HttpTryFrom, HeaderMap};
|
||||||
use http::header::{self, HeaderName, HeaderValue};
|
use http::header::{self, HeaderName, HeaderValue};
|
||||||
use bytes::{Bytes, BytesMut, BufMut};
|
use bytes::{Bytes, BytesMut};
|
||||||
use futures::{Future, Poll, Async};
|
use futures::{Future, Poll, Async};
|
||||||
use tokio_core::reactor::Timeout;
|
use tokio_core::reactor::Timeout;
|
||||||
|
|
||||||
@@ -402,42 +402,51 @@ impl Reader {
|
|||||||
// read payload
|
// read payload
|
||||||
let done = {
|
let done = {
|
||||||
if let Some(ref mut payload) = self.payload {
|
if let Some(ref mut payload) = self.payload {
|
||||||
match utils::read_from_io(io, buf) {
|
'buf: loop {
|
||||||
|
let not_ready = match utils::read_from_io(io, buf) {
|
||||||
Ok(Async::Ready(0)) => {
|
Ok(Async::Ready(0)) => {
|
||||||
payload.tx.set_error(PayloadError::Incomplete);
|
payload.tx.set_error(PayloadError::Incomplete);
|
||||||
|
|
||||||
// http channel should not deal with payload errors
|
// http channel should not deal with payload errors
|
||||||
return Err(ReaderError::Payload)
|
return Err(ReaderError::Payload)
|
||||||
},
|
},
|
||||||
|
Ok(Async::NotReady) => true,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
payload.tx.set_error(err.into());
|
payload.tx.set_error(err.into());
|
||||||
|
|
||||||
// http channel should not deal with payload errors
|
// http channel should not deal with payload errors
|
||||||
return Err(ReaderError::Payload)
|
return Err(ReaderError::Payload)
|
||||||
}
|
}
|
||||||
_ => (),
|
_ => false,
|
||||||
}
|
};
|
||||||
loop {
|
loop {
|
||||||
match payload.decoder.decode(buf) {
|
match payload.decoder.decode(buf) {
|
||||||
Ok(Async::Ready(Some(bytes))) => {
|
Ok(Async::Ready(Some(bytes))) => {
|
||||||
payload.tx.feed_data(bytes);
|
payload.tx.feed_data(bytes);
|
||||||
if payload.decoder.is_eof() {
|
if payload.decoder.is_eof() {
|
||||||
payload.tx.feed_eof();
|
payload.tx.feed_eof();
|
||||||
break true
|
break 'buf true
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Ok(Async::Ready(None)) => {
|
Ok(Async::Ready(None)) => {
|
||||||
payload.tx.feed_eof();
|
payload.tx.feed_eof();
|
||||||
break true
|
break 'buf true
|
||||||
|
},
|
||||||
|
Ok(Async::NotReady) => {
|
||||||
|
// if buffer is full then
|
||||||
|
// socket still can contain more data
|
||||||
|
if not_ready {
|
||||||
|
return Ok(Async::NotReady)
|
||||||
|
}
|
||||||
|
continue 'buf
|
||||||
},
|
},
|
||||||
Ok(Async::NotReady) =>
|
|
||||||
return Ok(Async::NotReady),
|
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
payload.tx.set_error(err.into());
|
payload.tx.set_error(err.into());
|
||||||
return Err(ReaderError::Payload)
|
return Err(ReaderError::Payload)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
@@ -445,16 +454,13 @@ impl Reader {
|
|||||||
if done { self.payload = None }
|
if done { self.payload = None }
|
||||||
|
|
||||||
// if buf is empty parse_message will always return NotReady, let's avoid that
|
// if buf is empty parse_message will always return NotReady, let's avoid that
|
||||||
let read = if buf.is_empty() {
|
if buf.is_empty() {
|
||||||
match utils::read_from_io(io, buf) {
|
match utils::read_from_io(io, buf) {
|
||||||
Ok(Async::Ready(0)) => return Err(ReaderError::Disconnect),
|
Ok(Async::Ready(0)) => return Err(ReaderError::Disconnect),
|
||||||
Ok(Async::Ready(_)) => (),
|
Ok(Async::Ready(_)) => (),
|
||||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||||
Err(err) => return Err(ReaderError::Error(err.into()))
|
Err(err) => return Err(ReaderError::Error(err.into()))
|
||||||
}
|
}
|
||||||
false
|
|
||||||
} else {
|
|
||||||
true
|
|
||||||
};
|
};
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
@@ -470,11 +476,10 @@ impl Reader {
|
|||||||
return Ok(Async::Ready(msg));
|
return Ok(Async::Ready(msg));
|
||||||
},
|
},
|
||||||
Async::NotReady => {
|
Async::NotReady => {
|
||||||
if buf.capacity() >= MAX_BUFFER_SIZE {
|
if buf.len() >= MAX_BUFFER_SIZE {
|
||||||
error!("MAX_BUFFER_SIZE unprocessed data reached, closing");
|
error!("MAX_BUFFER_SIZE unprocessed data reached, closing");
|
||||||
return Err(ReaderError::Error(ParseError::TooLarge));
|
return Err(ReaderError::Error(ParseError::TooLarge));
|
||||||
}
|
}
|
||||||
if read || buf.remaining_mut() == 0 {
|
|
||||||
match utils::read_from_io(io, buf) {
|
match utils::read_from_io(io, buf) {
|
||||||
Ok(Async::Ready(0)) => {
|
Ok(Async::Ready(0)) => {
|
||||||
debug!("Ignored premature client disconnection");
|
debug!("Ignored premature client disconnection");
|
||||||
@@ -484,9 +489,6 @@ impl Reader {
|
|||||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||||
Err(err) => return Err(ReaderError::Error(err.into())),
|
Err(err) => return Err(ReaderError::Error(err.into())),
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
return Ok(Async::NotReady)
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -63,7 +63,7 @@ pub(crate) struct WorkerSettings<H> {
|
|||||||
bytes: Rc<SharedBytesPool>,
|
bytes: Rc<SharedBytesPool>,
|
||||||
messages: Rc<helpers::SharedMessagePool>,
|
messages: Rc<helpers::SharedMessagePool>,
|
||||||
channels: Cell<usize>,
|
channels: Cell<usize>,
|
||||||
node: Node<()>,
|
node: Box<Node<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<H> WorkerSettings<H> {
|
impl<H> WorkerSettings<H> {
|
||||||
@@ -75,7 +75,7 @@ impl<H> WorkerSettings<H> {
|
|||||||
bytes: Rc::new(SharedBytesPool::new()),
|
bytes: Rc::new(SharedBytesPool::new()),
|
||||||
messages: Rc::new(helpers::SharedMessagePool::new()),
|
messages: Rc::new(helpers::SharedMessagePool::new()),
|
||||||
channels: Cell::new(0),
|
channels: Cell::new(0),
|
||||||
node: Node::head(),
|
node: Box::new(Node::head()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -697,7 +697,7 @@ fn create_tcp_listener(addr: net::SocketAddr, backlog: i32) -> io::Result<net::T
|
|||||||
net::SocketAddr::V4(_) => TcpBuilder::new_v4()?,
|
net::SocketAddr::V4(_) => TcpBuilder::new_v4()?,
|
||||||
net::SocketAddr::V6(_) => TcpBuilder::new_v6()?,
|
net::SocketAddr::V6(_) => TcpBuilder::new_v6()?,
|
||||||
};
|
};
|
||||||
builder.bind(addr)?;
|
|
||||||
builder.reuse_address(true)?;
|
builder.reuse_address(true)?;
|
||||||
|
builder.bind(addr)?;
|
||||||
Ok(builder.listen(backlog)?)
|
Ok(builder.listen(backlog)?)
|
||||||
}
|
}
|
||||||
|
@@ -94,6 +94,7 @@ fn test_start() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
#[cfg(unix)]
|
||||||
fn test_shutdown() {
|
fn test_shutdown() {
|
||||||
let _ = test::TestServer::unused_addr();
|
let _ = test::TestServer::unused_addr();
|
||||||
let (tx, rx) = mpsc::channel();
|
let (tx, rx) = mpsc::channel();
|
||||||
|
Reference in New Issue
Block a user