1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-07-16 22:55:47 +02:00

Compare commits

...

51 Commits

Author SHA1 Message Date
Nikolay Kim
f40153fca4 fix node::insert() method, missing next element 2018-09-17 11:39:03 -07:00
Nikolay Kim
764103566d update changes 2018-09-17 10:48:37 -07:00
Nikolay Kim
bfb2f2e9e1 fix node.remove(), update next node pointer 2018-09-17 10:25:45 -07:00
Nikolay Kim
599e6b3385 refactor channel node remove operation 2018-09-17 05:29:07 -07:00
Nikolay Kim
03e318f446 update changes 2018-09-15 17:10:53 -07:00
Nikolay Kim
7449884ce3 fix wrong error message for path deserialize for i32 #510 2018-09-15 17:09:07 -07:00
Nikolay Kim
bbe69e5b8d update version 2018-09-15 10:00:54 -07:00
Nikolay Kim
9d1eefc38f use 5 seconds keep-alive timer by default 2018-09-15 09:57:54 -07:00
Nikolay Kim
d65c72b44d use server keep-alive timer as slow request timer 2018-09-15 09:55:38 -07:00
Nikolay Kim
c3f8b5cf22 clippy warnings 2018-09-11 11:25:32 -07:00
Nikolay Kim
70a3f317d3 fix failing requests to test server #508 2018-09-11 11:24:05 -07:00
Nikolay Kim
513c8ec1ce Merge pull request #505 from Neopallium/master
Fix issue with HttpChannel linked list.
2018-09-11 11:18:33 -07:00
Robert G. Jakabosky
04608b2ea6 Update changes. 2018-09-12 00:27:15 +08:00
Robert G. Jakabosky
70b45659e2 Make Node's traverse method take a closure instead of calling shutdown on each HttpChannel. 2018-09-12 00:27:15 +08:00
Robert G. Jakabosky
e0ae6b10cd Fix bug with HttpChannel linked list. 2018-09-12 00:27:15 +08:00
Maciej Piechotka
003b05b095 Don't ignore errors in std::fmt::Debug implementations (#506) 2018-09-11 14:57:55 +03:00
Nikolay Kim
cdb57b840e prepare release 2018-09-07 20:47:54 -07:00
Nikolay Kim
002bb24b26 unhide SessionBackend and SessionImpl traits and cleanup warnings 2018-09-07 20:46:43 -07:00
Nikolay Kim
51982b3fec Merge pull request #503 from uzytkownik/route-regex
Refactor resource route parsing to allow repetition in the regexes
2018-09-07 20:19:31 -07:00
Maciej Piechotka
4251b0bc10 Refactor resource route parsing to allow repetition in the regexes 2018-09-06 08:51:55 +02:00
Nikolay Kim
42f3773bec update changes 2018-09-05 09:03:58 -07:00
Jan Michael Auer
86fdbb47a5 Fix system_exit in HttpServer (#501) 2018-09-05 10:41:23 +02:00
Nikolay Kim
4ca9fd2ad1 remove debug print 2018-09-03 22:09:12 -07:00
Nikolay Kim
f0f67072ae Read client response until eof if connection header set to close #464 2018-09-03 21:35:59 -07:00
Nikolay Kim
24d1228943 simplify handler path processing 2018-09-03 11:28:47 -07:00
Nikolay Kim
b7a73e0a4f fix Scope::handler doc test 2018-09-02 08:51:26 -07:00
Nikolay Kim
968c81e267 Handling scoped paths without leading slashes #460 2018-09-02 08:14:54 -07:00
Nikolay Kim
d5957a8466 Merge branch 'master' of https://github.com/actix/actix-web 2018-09-02 07:47:45 -07:00
Nikolay Kim
f2f05e7715 allow to register handlers on scope level #465 2018-09-02 07:47:19 -07:00
Markus Unterwaditzer
3439f55288 doc: Add example for using custom nativetls connector (#497) 2018-09-01 18:13:52 +03:00
Robert Gabriel Jakabosky
0425e2776f Fix Issue #490 (#498)
* Add failing testcase for HTTP 404 response with no reason text.

* Include canonical reason test for HTTP error responses.

* Don't send a reason for unknown status codes.
2018-09-01 12:00:32 +03:00
Nikolay Kim
6464f96f8b Merge branch 'master' of https://github.com/actix/actix-web 2018-08-31 18:56:53 -07:00
Nikolay Kim
a2b170fec9 fmt 2018-08-31 18:56:21 -07:00
Nikolay Kim
0b42cae082 update tests 2018-08-31 18:54:19 -07:00
Nikolay Kim
c313c003a4 Fix typo 2018-08-31 17:45:29 -07:00
Nikolay Kim
3fa23f5e10 update version 2018-08-31 17:25:15 -07:00
Nikolay Kim
2d51831899 handle socket read disconnect 2018-08-31 17:24:13 -07:00
Nikolay Kim
e59abfd716 Merge pull request #496 from Neopallium/master
Fix issue with 'Connection: close' in ClientRequest
2018-08-31 17:17:39 -07:00
Robert G. Jakabosky
66881d7dd1 If buffer is empty, read more data before calling parser. 2018-09-01 02:25:05 +08:00
Robert G. Jakabosky
a42a8a2321 Add some comments to clarify logic. 2018-09-01 02:15:36 +08:00
Robert G. Jakabosky
2341656173 Simplify buffer reading logic. Remove duplicate code. 2018-09-01 01:41:38 +08:00
Robert G. Jakabosky
487519acec Add client test for 'Connection: close' as reported in issue #495 2018-09-01 00:34:19 +08:00
Robert Gabriel Jakabosky
af6caa92c8 Merge branch 'master' into master 2018-09-01 00:17:34 +08:00
Robert G. Jakabosky
3ccbce6bc8 Fix issue with 'Connection: close' in ClientRequest 2018-09-01 00:08:53 +08:00
Armin Ronacher
797b52ecbf Update CHANGES.md 2018-08-29 20:58:23 +02:00
Markus Unterwaditzer
4bab50c861 Add ability to pass a custom TlsConnector (#491) 2018-08-29 20:53:31 +02:00
Nikolay Kim
5906971b6d Merge pull request #483 from Neopallium/master
Fix bug with client disconnect immediately after receiving http request.
2018-08-26 10:15:25 -07:00
Robert G. Jakabosky
8393d09a0f Fix tests. 2018-08-27 00:31:31 +08:00
Robert G. Jakabosky
c3ae9997fc Fix bug with http1 client disconnects. 2018-08-26 22:21:05 +08:00
Nikolay Kim
d39dcc58cd Merge pull request #482 from 0x1793d1/master
Fix server startup log message
2018-08-24 20:53:45 -07:00
0x1793d1
471a3e9806 Fix server startup log message 2018-08-24 23:21:32 +02:00
28 changed files with 747 additions and 455 deletions

View File

@@ -1,5 +1,63 @@
# Changes
## [0.7.8] - 2018-09-17
### Added
* Use server `Keep-Alive` setting as slow request timeout #439
### Changed
* Use 5 seconds keep-alive timer by default.
### Fixed
* Fixed wrong error message for i16 type #510
## [0.7.7] - 2018-09-11
### Fixed
* Fix linked list of HttpChannels #504
* Fix requests to TestServer fail #508
## [0.7.6] - 2018-09-07
### Fixed
* Fix system_exit in HttpServer #501
* Fix parsing of route param containin regexes with repetition #500
### Changes
* Unhide `SessionBackend` and `SessionImpl` traits #455
## [0.7.5] - 2018-09-04
### Added
* Added the ability to pass a custom `TlsConnector`.
* Allow to register handlers on scope level #465
### Fixed
* Handle socket read disconnect
* Handling scoped paths without leading slashes #460
### Changed
* Read client response until eof if connection header set to close #464
## [0.7.4] - 2018-08-23
### Added

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-web"
version = "0.7.4"
version = "0.7.8"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix web is a simple, pragmatic and extremely fast web framework for Rust."
readme = "README.md"
@@ -101,6 +101,7 @@ tokio-io = "0.1"
tokio-tcp = "0.1"
tokio-timer = "0.2"
tokio-reactor = "0.1"
tokio-current-thread = "0.1"
# native-tls
native-tls = { version="0.2", optional = true }

View File

@@ -447,11 +447,8 @@ where
{
let mut path = path.trim().trim_right_matches('/').to_owned();
if !path.is_empty() && !path.starts_with('/') {
path.insert(0, '/')
}
if path.len() > 1 && path.ends_with('/') {
path.pop();
}
path.insert(0, '/');
};
self.parts
.as_mut()
.expect("Use after finish")

View File

@@ -17,57 +17,34 @@ use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay;
#[cfg(feature = "alpn")]
use openssl::ssl::{Error as OpensslError, SslConnector, SslMethod};
#[cfg(feature = "alpn")]
use tokio_openssl::SslConnectorExt;
use {
openssl::ssl::{Error as SslError, SslConnector, SslMethod},
tokio_openssl::SslConnectorExt,
};
#[cfg(all(feature = "tls", not(feature = "alpn")))]
use native_tls::{Error as TlsError, TlsConnector as NativeTlsConnector};
#[cfg(all(feature = "tls", not(feature = "alpn")))]
use tokio_tls::{TlsConnector};
use {
native_tls::{Error as SslError, TlsConnector as NativeTlsConnector},
tokio_tls::TlsConnector as SslConnector,
};
#[cfg(
all(
feature = "rust-tls",
not(any(feature = "alpn", feature = "tls"))
)
)]
use rustls::ClientConfig;
#[cfg(
all(
feature = "rust-tls",
not(any(feature = "alpn", feature = "tls"))
)
)]
use std::io::Error as TLSError;
#[cfg(
all(
feature = "rust-tls",
not(any(feature = "alpn", feature = "tls"))
)
)]
use std::sync::Arc;
#[cfg(
all(
feature = "rust-tls",
not(any(feature = "alpn", feature = "tls"))
)
)]
use tokio_rustls::ClientConfigExt;
#[cfg(
all(
feature = "rust-tls",
not(any(feature = "alpn", feature = "tls"))
)
)]
use webpki::DNSNameRef;
#[cfg(
all(
feature = "rust-tls",
not(any(feature = "alpn", feature = "tls"))
)
)]
use webpki_roots;
#[cfg(all(
feature = "rust-tls",
not(any(feature = "alpn", feature = "tls"))
))]
use {
rustls::ClientConfig, std::io::Error as SslError, std::sync::Arc,
tokio_rustls::ClientConfigExt, webpki::DNSNameRef, webpki_roots,
};
#[cfg(all(
feature = "rust-tls",
not(any(feature = "alpn", feature = "tls"))
))]
type SslConnector = Arc<ClientConfig>;
#[cfg(not(any(feature = "alpn", feature = "tls", feature = "rust-tls")))]
type SslConnector = ();
use server::IoStream;
use {HAS_OPENSSL, HAS_RUSTLS, HAS_TLS};
@@ -173,24 +150,9 @@ pub enum ClientConnectorError {
SslIsNotSupported,
/// SSL error
#[cfg(feature = "alpn")]
#[cfg(any(feature = "tls", feature = "alpn", feature = "rust-tls"))]
#[fail(display = "{}", _0)]
SslError(#[cause] OpensslError),
/// SSL error
#[cfg(all(feature = "tls", not(feature = "alpn")))]
#[fail(display = "{}", _0)]
SslError(#[cause] TlsError),
/// SSL error
#[cfg(
all(
feature = "rust-tls",
not(any(feature = "alpn", feature = "tls"))
)
)]
#[fail(display = "{}", _0)]
SslError(#[cause] TLSError),
SslError(#[cause] SslError),
/// Resolver error
#[fail(display = "{}", _0)]
@@ -242,17 +204,8 @@ impl Paused {
/// `ClientConnector` type is responsible for transport layer of a
/// client connection.
pub struct ClientConnector {
#[cfg(all(feature = "alpn"))]
#[allow(dead_code)]
connector: SslConnector,
#[cfg(all(feature = "tls", not(feature = "alpn")))]
connector: TlsConnector,
#[cfg(
all(
feature = "rust-tls",
not(any(feature = "alpn", feature = "tls"))
)
)]
connector: Arc<ClientConfig>,
stats: ClientConnectorStats,
subscriber: Option<Recipient<ClientConnectorStats>>,
@@ -293,71 +246,36 @@ impl SystemService for ClientConnector {}
impl Default for ClientConnector {
fn default() -> ClientConnector {
#[cfg(all(feature = "alpn"))]
{
let builder = SslConnector::builder(SslMethod::tls()).unwrap();
ClientConnector::with_connector(builder.build())
}
#[cfg(all(feature = "tls", not(feature = "alpn")))]
{
let (tx, rx) = mpsc::unbounded();
let builder = NativeTlsConnector::builder();
ClientConnector {
stats: ClientConnectorStats::default(),
subscriber: None,
acq_tx: tx,
acq_rx: Some(rx),
resolver: None,
connector: builder.build().unwrap().into(),
conn_lifetime: Duration::from_secs(75),
conn_keep_alive: Duration::from_secs(15),
limit: 100,
limit_per_host: 0,
acquired: 0,
acquired_per_host: HashMap::new(),
available: HashMap::new(),
to_close: Vec::new(),
waiters: Some(HashMap::new()),
wait_timeout: None,
paused: Paused::No,
let connector = {
#[cfg(all(feature = "alpn"))]
{
SslConnector::builder(SslMethod::tls()).unwrap().build()
}
}
#[cfg(
all(
#[cfg(all(feature = "tls", not(feature = "alpn")))]
{
NativeTlsConnector::builder().build().unwrap().into()
}
#[cfg(all(
feature = "rust-tls",
not(any(feature = "alpn", feature = "tls"))
)
)]
{
let mut config = ClientConfig::new();
config
.root_store
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
ClientConnector::with_connector(config)
}
#[cfg(not(any(feature = "alpn", feature = "tls", feature = "rust-tls")))]
{
let (tx, rx) = mpsc::unbounded();
ClientConnector {
stats: ClientConnectorStats::default(),
subscriber: None,
acq_tx: tx,
acq_rx: Some(rx),
resolver: None,
conn_lifetime: Duration::from_secs(75),
conn_keep_alive: Duration::from_secs(15),
limit: 100,
limit_per_host: 0,
acquired: 0,
acquired_per_host: HashMap::new(),
available: HashMap::new(),
to_close: Vec::new(),
waiters: Some(HashMap::new()),
wait_timeout: None,
paused: Paused::No,
))]
{
let mut config = ClientConfig::new();
config
.root_store
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
Arc::new(config)
}
}
#[cfg(not(any(feature = "alpn", feature = "tls", feature = "rust-tls")))]
{
()
}
};
ClientConnector::with_connector_impl(connector)
}
}
@@ -375,7 +293,6 @@ impl ClientConnector {
/// # extern crate futures;
/// # use futures::{future, Future};
/// # use std::io::Write;
/// # use std::process;
/// # use actix_web::actix::Actor;
/// extern crate openssl;
/// use actix_web::{actix, client::ClientConnector, client::Connect};
@@ -402,35 +319,14 @@ impl ClientConnector {
/// }
/// ```
pub fn with_connector(connector: SslConnector) -> ClientConnector {
let (tx, rx) = mpsc::unbounded();
ClientConnector {
connector,
stats: ClientConnectorStats::default(),
subscriber: None,
acq_tx: tx,
acq_rx: Some(rx),
resolver: None,
conn_lifetime: Duration::from_secs(75),
conn_keep_alive: Duration::from_secs(15),
limit: 100,
limit_per_host: 0,
acquired: 0,
acquired_per_host: HashMap::new(),
available: HashMap::new(),
to_close: Vec::new(),
waiters: Some(HashMap::new()),
wait_timeout: None,
paused: Paused::No,
}
// keep level of indirection for docstrings matching featureflags
Self::with_connector_impl(connector)
}
#[cfg(
all(
feature = "rust-tls",
not(any(feature = "alpn", feature = "tls"))
)
)]
#[cfg(all(
feature = "rust-tls",
not(any(feature = "alpn", feature = "tls"))
))]
/// Create `ClientConnector` actor with custom `SslConnector` instance.
///
/// By default `ClientConnector` uses very a simple SSL configuration.
@@ -441,10 +337,8 @@ impl ClientConnector {
/// # #![cfg(feature = "rust-tls")]
/// # extern crate actix_web;
/// # extern crate futures;
/// # extern crate tokio;
/// # use futures::{future, Future};
/// # use std::io::Write;
/// # use std::process;
/// # use actix_web::actix::Actor;
/// extern crate rustls;
/// extern crate webpki_roots;
@@ -476,10 +370,61 @@ impl ClientConnector {
/// }
/// ```
pub fn with_connector(connector: ClientConfig) -> ClientConnector {
// keep level of indirection for docstrings matching featureflags
Self::with_connector_impl(Arc::new(connector))
}
#[cfg(all(
feature = "tls",
not(any(feature = "alpn", feature = "rust-tls"))
))]
/// Create `ClientConnector` actor with custom `SslConnector` instance.
///
/// By default `ClientConnector` uses very a simple SSL configuration.
/// With `with_connector` method it is possible to use a custom
/// `SslConnector` object.
///
/// ```rust
/// # #![cfg(feature = "tls")]
/// # extern crate actix_web;
/// # extern crate futures;
/// # use futures::{future, Future};
/// # use std::io::Write;
/// # use actix_web::actix::Actor;
/// extern crate native_tls;
/// extern crate webpki_roots;
/// use native_tls::TlsConnector;
/// use actix_web::{actix, client::ClientConnector, client::Connect};
///
/// fn main() {
/// actix::run(|| {
/// let connector = TlsConnector::new().unwrap();
/// let conn = ClientConnector::with_connector(connector.into()).start();
///
/// conn.send(
/// Connect::new("https://www.rust-lang.org").unwrap()) // <- connect to host
/// .map_err(|_| ())
/// .and_then(|res| {
/// if let Ok(mut stream) = res {
/// stream.write_all(b"GET / HTTP/1.0\r\n\r\n").unwrap();
/// }
/// # actix::System::current().stop();
/// Ok(())
/// })
/// });
/// }
/// ```
pub fn with_connector(connector: SslConnector) -> ClientConnector {
// keep level of indirection for docstrings matching featureflags
Self::with_connector_impl(connector)
}
#[inline]
fn with_connector_impl(connector: SslConnector) -> ClientConnector {
let (tx, rx) = mpsc::unbounded();
ClientConnector {
connector: Arc::new(connector),
connector,
stats: ClientConnectorStats::default(),
subscriber: None,
acq_tx: tx,
@@ -853,12 +798,10 @@ impl ClientConnector {
}
}
#[cfg(
all(
feature = "rust-tls",
not(any(feature = "alpn", feature = "tls"))
)
)]
#[cfg(all(
feature = "rust-tls",
not(any(feature = "alpn", feature = "tls"))
))]
match res {
Err(err) => {
let _ = waiter.tx.send(Err(err.into()));
@@ -1344,7 +1287,7 @@ impl AsyncWrite for Connection {
}
#[cfg(feature = "tls")]
use tokio_tls::{TlsStream};
use tokio_tls::TlsStream;
#[cfg(feature = "tls")]
/// This is temp solution untile actix-net migration
@@ -1364,4 +1307,4 @@ impl<Io: IoStream> IoStream for TlsStream<Io> {
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
self.get_mut().get_mut().set_linger(dur)
}
}
}

View File

@@ -20,6 +20,7 @@ const MAX_HEADERS: usize = 96;
#[derive(Default)]
pub struct HttpResponseParser {
decoder: Option<EncodingDecoder>,
eof: bool, // indicate that we read payload until stream eof
}
#[derive(Debug, Fail)]
@@ -38,43 +39,42 @@ impl HttpResponseParser {
where
T: IoStream,
{
// if buf is empty parse_message will always return NotReady, let's avoid that
if buf.is_empty() {
loop {
// Don't call parser until we have data to parse.
if !buf.is_empty() {
match HttpResponseParser::parse_message(buf)
.map_err(HttpResponseParserError::Error)?
{
Async::Ready((msg, info)) => {
if let Some((decoder, eof)) = info {
self.eof = eof;
self.decoder = Some(decoder);
} else {
self.eof = false;
self.decoder = None;
}
return Ok(Async::Ready(msg));
}
Async::NotReady => {
if buf.capacity() >= MAX_BUFFER_SIZE {
return Err(HttpResponseParserError::Error(
ParseError::TooLarge,
));
}
// Parser needs more data.
}
}
}
// Read some more data into the buffer for the parser.
match io.read_available(buf) {
Ok(Async::Ready(true)) => {
Ok(Async::Ready((false, true))) => {
return Err(HttpResponseParserError::Disconnect)
}
Ok(Async::Ready(false)) => (),
Ok(Async::Ready(_)) => (),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => return Err(HttpResponseParserError::Error(err.into())),
}
}
loop {
match HttpResponseParser::parse_message(buf)
.map_err(HttpResponseParserError::Error)?
{
Async::Ready((msg, decoder)) => {
self.decoder = decoder;
return Ok(Async::Ready(msg));
}
Async::NotReady => {
if buf.capacity() >= MAX_BUFFER_SIZE {
return Err(HttpResponseParserError::Error(ParseError::TooLarge));
}
match io.read_available(buf) {
Ok(Async::Ready(true)) => {
return Err(HttpResponseParserError::Disconnect)
}
Ok(Async::Ready(false)) => (),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => {
return Err(HttpResponseParserError::Error(err.into()))
}
}
}
}
}
}
pub fn parse_payload<T>(
@@ -87,8 +87,8 @@ impl HttpResponseParser {
loop {
// read payload
let (not_ready, stream_finished) = match io.read_available(buf) {
Ok(Async::Ready(true)) => (false, true),
Ok(Async::Ready(false)) => (false, false),
Ok(Async::Ready((_, true))) => (false, true),
Ok(Async::Ready((_, false))) => (false, false),
Ok(Async::NotReady) => (true, false),
Err(err) => return Err(err.into()),
};
@@ -104,7 +104,12 @@ impl HttpResponseParser {
return Ok(Async::NotReady);
}
if stream_finished {
return Err(PayloadError::Incomplete);
// read untile eof?
if self.eof {
return Ok(Async::Ready(None));
} else {
return Err(PayloadError::Incomplete);
}
}
}
Err(err) => return Err(err.into()),
@@ -117,7 +122,7 @@ impl HttpResponseParser {
fn parse_message(
buf: &mut BytesMut,
) -> Poll<(ClientResponse, Option<EncodingDecoder>), ParseError> {
) -> Poll<(ClientResponse, Option<(EncodingDecoder, bool)>), ParseError> {
// Unsafe: we read only this data only after httparse parses headers into.
// performance bump for pipeline benchmarks.
let mut headers: [HeaderIndex; MAX_HEADERS] = unsafe { mem::uninitialized() };
@@ -163,12 +168,12 @@ impl HttpResponseParser {
}
let decoder = if status == StatusCode::SWITCHING_PROTOCOLS {
Some(EncodingDecoder::eof())
Some((EncodingDecoder::eof(), true))
} else if let Some(len) = hdrs.get(header::CONTENT_LENGTH) {
// Content-Length
if let Ok(s) = len.to_str() {
if let Ok(len) = s.parse::<u64>() {
Some(EncodingDecoder::length(len))
Some((EncodingDecoder::length(len), false))
} else {
debug!("illegal Content-Length: {:?}", len);
return Err(ParseError::Header);
@@ -179,7 +184,18 @@ impl HttpResponseParser {
}
} else if chunked(&hdrs)? {
// Chunked encoding
Some(EncodingDecoder::chunked())
Some((EncodingDecoder::chunked(), false))
} else if let Some(value) = hdrs.get(header::CONNECTION) {
let close = if let Ok(s) = value.to_str() {
s == "close"
} else {
false
};
if close {
Some((EncodingDecoder::eof(), true))
} else {
None
}
} else {
None
};

View File

@@ -254,16 +254,16 @@ impl ClientRequest {
impl fmt::Debug for ClientRequest {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let res = writeln!(
writeln!(
f,
"\nClientRequest {:?} {}:{}",
self.version, self.method, self.uri
);
let _ = writeln!(f, " headers:");
)?;
writeln!(f, " headers:")?;
for (key, val) in self.headers.iter() {
let _ = writeln!(f, " {:?}: {:?}", key, val);
writeln!(f, " {:?}: {:?}", key, val)?;
}
res
Ok(())
}
}
@@ -750,16 +750,16 @@ fn parts<'a>(
impl fmt::Debug for ClientRequestBuilder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if let Some(ref parts) = self.request {
let res = writeln!(
writeln!(
f,
"\nClientRequestBuilder {:?} {}:{}",
parts.version, parts.method, parts.uri
);
let _ = writeln!(f, " headers:");
)?;
writeln!(f, " headers:")?;
for (key, val) in parts.headers.iter() {
let _ = writeln!(f, " {:?}: {:?}", key, val);
writeln!(f, " {:?}: {:?}", key, val)?;
}
res
Ok(())
} else {
write!(f, "ClientRequestBuilder(Consumed)")
}

View File

@@ -95,12 +95,12 @@ impl ClientResponse {
impl fmt::Debug for ClientResponse {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let res = writeln!(f, "\nClientResponse {:?} {}", self.version(), self.status());
let _ = writeln!(f, " headers:");
writeln!(f, "\nClientResponse {:?} {}", self.version(), self.status())?;
writeln!(f, " headers:")?;
for (key, val) in self.headers().iter() {
let _ = writeln!(f, " {:?}: {:?}", key, val);
writeln!(f, " {:?}: {:?}", key, val)?;
}
res
Ok(())
}
}

View File

@@ -175,7 +175,7 @@ impl<'de, S: 'de> Deserializer<'de> for PathDeserializer<'de, S> {
parse_single_value!(deserialize_bool, visit_bool, "bool");
parse_single_value!(deserialize_i8, visit_i8, "i8");
parse_single_value!(deserialize_i16, visit_i16, "i16");
parse_single_value!(deserialize_i32, visit_i32, "i16");
parse_single_value!(deserialize_i32, visit_i32, "i32");
parse_single_value!(deserialize_i64, visit_i64, "i64");
parse_single_value!(deserialize_u8, visit_u8, "u8");
parse_single_value!(deserialize_u16, visit_u16, "u16");

View File

@@ -354,24 +354,24 @@ impl<S> FromRequest<S> for HttpRequest<S> {
impl<S> fmt::Debug for HttpRequest<S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let res = writeln!(
writeln!(
f,
"\nHttpRequest {:?} {}:{}",
self.version(),
self.method(),
self.path()
);
)?;
if !self.query_string().is_empty() {
let _ = writeln!(f, " query: ?{:?}", self.query_string());
writeln!(f, " query: ?{:?}", self.query_string())?;
}
if !self.match_info().is_empty() {
let _ = writeln!(f, " params: {:?}", self.match_info());
writeln!(f, " params: {:?}", self.match_info())?;
}
let _ = writeln!(f, " headers:");
writeln!(f, " headers:")?;
for (key, val) in self.headers().iter() {
let _ = writeln!(f, " {:?}: {:?}", key, val);
writeln!(f, " {:?}: {:?}", key, val)?;
}
res
Ok(())
}
}

View File

@@ -118,6 +118,7 @@ extern crate parking_lot;
extern crate rand;
extern crate slab;
extern crate tokio;
extern crate tokio_current_thread;
extern crate tokio_io;
extern crate tokio_reactor;
extern crate tokio_tcp;

View File

@@ -270,14 +270,17 @@ impl<S: 'static, T: SessionBackend<S>> Middleware<S> for SessionStorage<T, S> {
}
/// A simple key-value storage interface that is internally used by `Session`.
#[doc(hidden)]
pub trait SessionImpl: 'static {
/// Get session value by key
fn get(&self, key: &str) -> Option<&str>;
/// Set session value
fn set(&mut self, key: &str, value: String);
/// Remove specific key from session
fn remove(&mut self, key: &str);
/// Remove all values from session
fn clear(&mut self);
/// Write session to storage backend.
@@ -285,9 +288,10 @@ pub trait SessionImpl: 'static {
}
/// Session's storage backend trait definition.
#[doc(hidden)]
pub trait SessionBackend<S>: Sized + 'static {
/// Session item
type Session: SessionImpl;
/// Future that reads session
type ReadFuture: Future<Item = Self::Session, Error = Error>;
/// Parse the session from request and load data from a storage backend.

View File

@@ -441,13 +441,13 @@ where
impl<S> fmt::Debug for Field<S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let res = writeln!(f, "\nMultipartField: {}", self.ct);
let _ = writeln!(f, " boundary: {}", self.inner.borrow().boundary);
let _ = writeln!(f, " headers:");
writeln!(f, "\nMultipartField: {}", self.ct)?;
writeln!(f, " boundary: {}", self.inner.borrow().boundary)?;
writeln!(f, " headers:")?;
for (key, val) in self.headers.iter() {
let _ = writeln!(f, " {:?}: {:?}", key, val);
writeln!(f, " {:?}: {:?}", key, val)?;
}
res
Ok(())
}
}

View File

@@ -236,7 +236,6 @@ macro_rules! FROM_STR {
($type:ty) => {
impl FromParam for $type {
type Err = InternalError<<$type as FromStr>::Err>;
fn from_param(val: &str) -> Result<Self, Self::Err> {
<$type as FromStr>::from_str(val)
.map_err(|e| InternalError::new(e, StatusCode::BAD_REQUEST))

View File

@@ -815,73 +815,70 @@ impl ResourceDef {
Ok(())
}
fn parse(
pattern: &str, for_prefix: bool,
) -> (String, Vec<PatternElement>, bool, usize) {
fn parse_param(pattern: &str) -> (PatternElement, String, &str) {
const DEFAULT_PATTERN: &str = "[^/]+";
let mut re1 = String::from("^");
let mut re2 = String::new();
let mut el = String::new();
let mut in_param = false;
let mut in_param_pattern = false;
let mut param_name = String::new();
let mut param_pattern = String::from(DEFAULT_PATTERN);
let mut is_dynamic = false;
let mut elems = Vec::new();
let mut len = 0;
for ch in pattern.chars() {
if in_param {
// In parameter segment: `{....}`
if ch == '}' {
elems.push(PatternElement::Var(param_name.clone()));
re1.push_str(&format!(r"(?P<{}>{})", &param_name, &param_pattern));
param_name.clear();
param_pattern = String::from(DEFAULT_PATTERN);
len = 0;
in_param_pattern = false;
in_param = false;
} else if ch == ':' {
// The parameter name has been determined; custom pattern land
in_param_pattern = true;
param_pattern.clear();
} else if in_param_pattern {
// Ignore leading whitespace for pattern
if !(ch == ' ' && param_pattern.is_empty()) {
param_pattern.push(ch);
}
} else {
param_name.push(ch);
let mut params_nesting = 0usize;
let close_idx = pattern
.find(|c| match c {
'{' => {
params_nesting += 1;
false
}
} else if ch == '{' {
in_param = true;
is_dynamic = true;
elems.push(PatternElement::Str(el.clone()));
el.clear();
} else {
re1.push_str(escape(&ch.to_string()).as_str());
re2.push(ch);
el.push(ch);
len += 1;
'}' => {
params_nesting -= 1;
params_nesting == 0
}
_ => false,
}).expect("malformed param");
let (mut param, rem) = pattern.split_at(close_idx + 1);
param = &param[1..param.len() - 1]; // Remove outer brackets
let (name, pattern) = match param.find(':') {
Some(idx) => {
let (name, pattern) = param.split_at(idx);
(name, &pattern[1..])
}
}
if !el.is_empty() {
elems.push(PatternElement::Str(el.clone()));
}
let re = if is_dynamic {
if !for_prefix {
re1.push('$');
}
re1
} else {
re2
None => (param, DEFAULT_PATTERN),
};
(re, elems, is_dynamic, len)
(
PatternElement::Var(name.to_string()),
format!(r"(?P<{}>{})", &name, &pattern),
rem,
)
}
fn parse(
mut pattern: &str, for_prefix: bool,
) -> (String, Vec<PatternElement>, bool, usize) {
if pattern.find('{').is_none() {
return (
String::from(pattern),
vec![PatternElement::Str(String::from(pattern))],
false,
pattern.chars().count(),
);
};
let mut elems = Vec::new();
let mut re = String::from("^");
while let Some(idx) = pattern.find('{') {
let (prefix, rem) = pattern.split_at(idx);
elems.push(PatternElement::Str(String::from(prefix)));
re.push_str(&escape(prefix));
let (param_pattern, re_part, rem) = Self::parse_param(rem);
elems.push(param_pattern);
re.push_str(&re_part);
pattern = rem;
}
elems.push(PatternElement::Str(String::from(pattern)));
re.push_str(&escape(pattern));
if !for_prefix {
re.push_str("$");
}
(re, elems, true, pattern.chars().count())
}
}
@@ -1072,6 +1069,16 @@ mod tests {
let info = re.match_with_params(&req, 0).unwrap();
assert_eq!(info.get("version").unwrap(), "151");
assert_eq!(info.get("id").unwrap(), "adahg32");
let re = ResourceDef::new("/{id:[[:digit:]]{6}}");
assert!(re.is_match("/012345"));
assert!(!re.is_match("/012"));
assert!(!re.is_match("/01234567"));
assert!(!re.is_match("/XXXXXX"));
let req = TestRequest::with_uri("/012345").finish();
let info = re.match_with_params(&req, 0).unwrap();
assert_eq!(info.get("id").unwrap(), "012345");
}
#[test]

View File

@@ -5,7 +5,10 @@ use std::rc::Rc;
use futures::{Async, Future, Poll};
use error::Error;
use handler::{AsyncResult, AsyncResultItem, FromRequest, Responder, RouteHandler};
use handler::{
AsyncResult, AsyncResultItem, FromRequest, Handler, Responder, RouteHandler,
WrapHandler,
};
use http::Method;
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
@@ -180,7 +183,7 @@ impl<S: 'static> Scope<S> {
where
F: FnOnce(Scope<S>) -> Scope<S>,
{
let rdef = ResourceDef::prefix(&path);
let rdef = ResourceDef::prefix(&insert_slash(path));
let scope = Scope {
rdef: rdef.clone(),
filters: Vec::new(),
@@ -227,9 +230,11 @@ impl<S: 'static> Scope<S> {
R: Responder + 'static,
T: FromRequest<S> + 'static,
{
Rc::get_mut(&mut self.router)
.unwrap()
.register_route(path, method, f);
Rc::get_mut(&mut self.router).unwrap().register_route(
&insert_slash(path),
method,
f,
);
self
}
@@ -261,7 +266,7 @@ impl<S: 'static> Scope<S> {
F: FnOnce(&mut Resource<S>) -> R + 'static,
{
// add resource
let mut resource = Resource::new(ResourceDef::new(path));
let mut resource = Resource::new(ResourceDef::new(&insert_slash(path)));
f(&mut resource);
Rc::get_mut(&mut self.router)
@@ -286,6 +291,35 @@ impl<S: 'static> Scope<S> {
self
}
/// Configure handler for specific path prefix.
///
/// A path prefix consists of valid path segments, i.e for the
/// prefix `/app` any request with the paths `/app`, `/app/` or
/// `/app/test` would match, but the path `/application` would
/// not.
///
/// ```rust
/// # extern crate actix_web;
/// use actix_web::{http, App, HttpRequest, HttpResponse};
///
/// fn main() {
/// let app = App::new().scope("/scope-prefix", |scope| {
/// scope.handler("/app", |req: &HttpRequest| match *req.method() {
/// http::Method::GET => HttpResponse::Ok(),
/// http::Method::POST => HttpResponse::MethodNotAllowed(),
/// _ => HttpResponse::NotFound(),
/// })
/// });
/// }
/// ```
pub fn handler<H: Handler<S>>(mut self, path: &str, handler: H) -> Scope<S> {
let path = insert_slash(path.trim().trim_right_matches('/'));
Rc::get_mut(&mut self.router)
.expect("Multiple copies of scope router")
.register_handler(&path, Box::new(WrapHandler::new(handler)), None);
self
}
/// Register a scope middleware
///
/// This is similar to `App's` middlewares, but
@@ -301,6 +335,14 @@ impl<S: 'static> Scope<S> {
}
}
fn insert_slash(path: &str) -> String {
let mut path = path.to_owned();
if !path.is_empty() && !path.starts_with('/') {
path.insert(0, '/');
};
path
}
impl<S: 'static> RouteHandler<S> for Scope<S> {
fn handle(&self, req: &HttpRequest<S>) -> AsyncResult<HttpResponse> {
let tail = req.match_info().tail as usize;
@@ -779,11 +821,37 @@ mod tests {
scope
.route("/path1", Method::GET, |_: HttpRequest<_>| {
HttpResponse::Ok()
}).route(
"/path1",
Method::DELETE,
|_: HttpRequest<_>| HttpResponse::Ok(),
)
}).route("/path1", Method::DELETE, |_: HttpRequest<_>| {
HttpResponse::Ok()
})
}).finish();
let req = TestRequest::with_uri("/app/path1").request();
let resp = app.run(req);
assert_eq!(resp.as_msg().status(), StatusCode::OK);
let req = TestRequest::with_uri("/app/path1")
.method(Method::DELETE)
.request();
let resp = app.run(req);
assert_eq!(resp.as_msg().status(), StatusCode::OK);
let req = TestRequest::with_uri("/app/path1")
.method(Method::POST)
.request();
let resp = app.run(req);
assert_eq!(resp.as_msg().status(), StatusCode::NOT_FOUND);
}
#[test]
fn test_scope_route_without_leading_slash() {
let app = App::new()
.scope("app", |scope| {
scope
.route("path1", Method::GET, |_: HttpRequest<_>| HttpResponse::Ok())
.route("path1", Method::DELETE, |_: HttpRequest<_>| {
HttpResponse::Ok()
})
}).finish();
let req = TestRequest::with_uri("/app/path1").request();
@@ -972,6 +1040,20 @@ mod tests {
assert_eq!(resp.as_msg().status(), StatusCode::CREATED);
}
#[test]
fn test_nested_scope_no_slash() {
let app = App::new()
.scope("/app", |scope| {
scope.nested("t1", |scope| {
scope.resource("/path1", |r| r.f(|_| HttpResponse::Created()))
})
}).finish();
let req = TestRequest::with_uri("/app/t1/path1").request();
let resp = app.run(req);
assert_eq!(resp.as_msg().status(), StatusCode::CREATED);
}
#[test]
fn test_nested_scope_root() {
let app = App::new()
@@ -1120,4 +1202,32 @@ mod tests {
let resp = app.run(req);
assert_eq!(resp.as_msg().status(), StatusCode::METHOD_NOT_ALLOWED);
}
#[test]
fn test_handler() {
let app = App::new()
.scope("/scope", |scope| {
scope.handler("/test", |_: &_| HttpResponse::Ok())
}).finish();
let req = TestRequest::with_uri("/scope/test").request();
let resp = app.run(req);
assert_eq!(resp.as_msg().status(), StatusCode::OK);
let req = TestRequest::with_uri("/scope/test/").request();
let resp = app.run(req);
assert_eq!(resp.as_msg().status(), StatusCode::OK);
let req = TestRequest::with_uri("/scope/test/app").request();
let resp = app.run(req);
assert_eq!(resp.as_msg().status(), StatusCode::OK);
let req = TestRequest::with_uri("/scope/testapp").request();
let resp = app.run(req);
assert_eq!(resp.as_msg().status(), StatusCode::NOT_FOUND);
let req = TestRequest::with_uri("/scope/blah").request();
let resp = app.run(req);
assert_eq!(resp.as_msg().status(), StatusCode::NOT_FOUND);
}
}

View File

@@ -451,10 +451,13 @@ impl Accept {
Delay::new(
Instant::now() + Duration::from_millis(510),
).map_err(|_| ())
.and_then(move |_| {
let _ = r.set_readiness(mio::Ready::readable());
Ok(())
}),
.and_then(
move |_| {
let _ =
r.set_readiness(mio::Ready::readable());
Ok(())
},
),
);
Ok(())
},

View File

@@ -5,6 +5,7 @@ use std::{io, ptr, time};
use bytes::{Buf, BufMut, BytesMut};
use futures::{Async, Future, Poll};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay;
use super::settings::WorkerSettings;
use super::{h1, h2, ConnectionTag, HttpHandler, IoStream};
@@ -30,6 +31,7 @@ where
{
proto: Option<HttpProtocol<T, H>>,
node: Option<Node<HttpChannel<T, H>>>,
ka_timeout: Option<Delay>,
_tag: ConnectionTag,
}
@@ -42,9 +44,11 @@ where
settings: Rc<WorkerSettings<H>>, io: T, peer: Option<SocketAddr>,
) -> HttpChannel<T, H> {
let _tag = settings.connection();
let ka_timeout = settings.keep_alive_timer();
HttpChannel {
_tag,
ka_timeout,
node: None,
proto: Some(HttpProtocol::Unknown(
settings,
@@ -55,7 +59,7 @@ where
}
}
fn shutdown(&mut self) {
pub(crate) fn shutdown(&mut self) {
match self.proto {
Some(HttpProtocol::H1(ref mut h1)) => {
let io = h1.io();
@@ -68,6 +72,18 @@ where
}
}
impl<T, H> Drop for HttpChannel<T, H>
where
T: IoStream,
H: HttpHandler + 'static,
{
fn drop(&mut self) {
if let Some(mut node) = self.node.take() {
node.remove()
}
}
}
impl<T, H> Future for HttpChannel<T, H>
where
T: IoStream,
@@ -77,7 +93,19 @@ where
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.node.is_some() {
// keep-alive timer
if let Some(ref mut timer) = self.ka_timeout {
match timer.poll() {
Ok(Async::Ready(_)) => {
trace!("Slow request timed out, close connection");
return Ok(Async::Ready(()));
}
Ok(Async::NotReady) => (),
Err(_) => panic!("Something is really wrong"),
}
}
if self.node.is_none() {
let el = self as *mut _;
self.node = Some(Node::new(el));
let _ = match self.proto {
@@ -94,41 +122,32 @@ where
};
}
let mut is_eof = false;
let kind = match self.proto {
Some(HttpProtocol::H1(ref mut h1)) => {
let result = h1.poll();
match result {
Ok(Async::Ready(())) | Err(_) => {
if let Some(n) = self.node.as_mut() {
n.remove()
};
}
_ => (),
}
return result;
return h1.poll();
}
Some(HttpProtocol::H2(ref mut h2)) => {
let result = h2.poll();
match result {
Ok(Async::Ready(())) | Err(_) => {
if let Some(n) = self.node.as_mut() {
n.remove()
};
return h2.poll();
}
Some(HttpProtocol::Unknown(_, _, ref mut io, ref mut buf)) => {
let mut disconnect = false;
match io.read_available(buf) {
Ok(Async::Ready((read_some, stream_closed))) => {
is_eof = stream_closed;
// Only disconnect if no data was read.
if is_eof && !read_some {
disconnect = true;
}
}
Err(_) => {
disconnect = true;
}
_ => (),
}
return result;
}
Some(HttpProtocol::Unknown(_, _, ref mut io, ref mut buf)) => {
match io.read_available(buf) {
Ok(Async::Ready(true)) | Err(_) => {
debug!("Ignored premature client disconnection");
if let Some(n) = self.node.as_mut() {
n.remove()
};
return Err(());
}
_ => (),
if disconnect {
debug!("Ignored premature client disconnection");
return Err(());
}
if buf.len() >= 14 {
@@ -148,8 +167,14 @@ where
if let Some(HttpProtocol::Unknown(settings, addr, io, buf)) = self.proto.take() {
match kind {
ProtocolKind::Http1 => {
self.proto =
Some(HttpProtocol::H1(h1::Http1::new(settings, io, addr, buf)));
self.proto = Some(HttpProtocol::H1(h1::Http1::new(
settings,
io,
addr,
buf,
is_eof,
self.ka_timeout.take(),
)));
return self.poll();
}
ProtocolKind::Http2 => {
@@ -158,6 +183,7 @@ where
io,
addr,
buf.freeze(),
self.ka_timeout.take(),
)));
return self.poll();
}
@@ -182,13 +208,14 @@ impl<T> Node<T> {
}
}
fn insert<I>(&mut self, next: &mut Node<I>) {
fn insert<I>(&mut self, next_el: &mut Node<I>) {
unsafe {
let next: *mut Node<T> = next as *const _ as *mut _;
let next: *mut Node<T> = next_el as *const _ as *mut _;
if let Some(ref mut next2) = self.next {
if let Some(next2) = self.next {
let n = next2.as_mut().unwrap();
n.prev = Some(next);
next_el.next = Some(next2 as *mut _);
}
self.next = Some(next);
@@ -201,11 +228,14 @@ impl<T> Node<T> {
unsafe {
self.element = ptr::null_mut();
let next = self.next.take();
let mut prev = self.prev.take();
let prev = self.prev.take();
if let Some(ref mut prev) = prev {
if let Some(prev) = prev {
prev.as_mut().unwrap().next = next;
}
if let Some(next) = next {
next.as_mut().unwrap().prev = prev;
}
}
}
}
@@ -219,7 +249,7 @@ impl Node<()> {
}
}
pub(crate) fn traverse<T, H>(&self)
pub(crate) fn traverse<T, H, F: Fn(&mut HttpChannel<T, H>)>(&self, f: F)
where
T: IoStream,
H: HttpHandler + 'static,
@@ -234,7 +264,7 @@ impl Node<()> {
if !n.element.is_null() {
let ch: &mut HttpChannel<T, H> =
&mut *(&mut *(n.element as *mut _) as *mut () as *mut _);
ch.shutdown();
f(ch);
}
}
} else {

View File

@@ -21,7 +21,12 @@ impl HttpHandlerTask for ServerError {
bytes.reserve(helpers::STATUS_LINE_BUF_SIZE + 1);
helpers::write_status_line(self.0, self.1.as_u16(), bytes);
}
// Convert Status Code to Reason.
let reason = self.1.canonical_reason().unwrap_or("");
io.buffer().extend_from_slice(reason.as_bytes());
// No response body.
io.buffer().extend_from_slice(b"\r\ncontent-length: 0\r\n");
// date header
io.set_date();
Ok(Async::Ready(true))
}

View File

@@ -22,13 +22,14 @@ use super::{HttpHandler, HttpHandlerTask, IoStream};
const MAX_PIPELINED_MESSAGES: usize = 16;
bitflags! {
struct Flags: u8 {
const STARTED = 0b0000_0001;
const ERROR = 0b0000_0010;
const KEEPALIVE = 0b0000_0100;
const SHUTDOWN = 0b0000_1000;
const DISCONNECTED = 0b0001_0000;
const POLLED = 0b0010_0000;
pub struct Flags: u8 {
const STARTED = 0b0000_0001;
const ERROR = 0b0000_0010;
const KEEPALIVE = 0b0000_0100;
const SHUTDOWN = 0b0000_1000;
const READ_DISCONNECTED = 0b0001_0000;
const WRITE_DISCONNECTED = 0b0010_0000;
const POLLED = 0b0100_0000;
}
}
@@ -90,18 +91,22 @@ where
{
pub fn new(
settings: Rc<WorkerSettings<H>>, stream: T, addr: Option<SocketAddr>,
buf: BytesMut,
buf: BytesMut, is_eof: bool, keepalive_timer: Option<Delay>,
) -> Self {
Http1 {
flags: Flags::KEEPALIVE,
flags: if is_eof {
Flags::READ_DISCONNECTED
} else {
Flags::KEEPALIVE
},
stream: H1Writer::new(stream, Rc::clone(&settings)),
decoder: H1Decoder::new(),
payload: None,
tasks: VecDeque::new(),
keepalive_timer: None,
addr,
buf,
settings,
keepalive_timer,
}
}
@@ -117,6 +122,13 @@ where
#[inline]
fn can_read(&self) -> bool {
if self
.flags
.intersects(Flags::ERROR | Flags::READ_DISCONNECTED)
{
return false;
}
if let Some(ref info) = self.payload {
info.need_read() == PayloadStatus::Read
} else {
@@ -125,6 +137,8 @@ where
}
fn notify_disconnect(&mut self) {
self.flags.insert(Flags::WRITE_DISCONNECTED);
// notify all tasks
self.stream.disconnected();
for task in &mut self.tasks {
@@ -132,6 +146,21 @@ where
}
}
fn client_disconnect(&mut self) {
// notify all tasks
self.notify_disconnect();
// kill keepalive
self.keepalive_timer.take();
// on parse error, stop reading stream but tasks need to be
// completed
self.flags.insert(Flags::ERROR);
if let Some(mut payload) = self.payload.take() {
payload.set_error(PayloadError::Incomplete);
}
}
#[inline]
pub fn poll(&mut self) -> Poll<(), ()> {
// keep-alive timer
@@ -148,6 +177,11 @@ where
// shutdown
if self.flags.contains(Flags::SHUTDOWN) {
if self.flags.intersects(
Flags::ERROR | Flags::READ_DISCONNECTED | Flags::WRITE_DISCONNECTED,
) {
return Ok(Async::Ready(()));
}
match self.stream.poll_completed(true) {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(_)) => return Ok(Async::Ready(())),
@@ -182,44 +216,25 @@ where
self.flags.insert(Flags::POLLED);
return;
}
// read io from socket
if !self.flags.intersects(Flags::ERROR)
&& self.tasks.len() < MAX_PIPELINED_MESSAGES
&& self.can_read()
{
if self.can_read() && self.tasks.len() < MAX_PIPELINED_MESSAGES {
match self.stream.get_mut().read_available(&mut self.buf) {
Ok(Async::Ready(disconnected)) => {
if disconnected {
// notify all tasks
self.notify_disconnect();
// kill keepalive
self.keepalive_timer.take();
// on parse error, stop reading stream but tasks need to be
// completed
self.flags.insert(Flags::ERROR);
if let Some(mut payload) = self.payload.take() {
payload.set_error(PayloadError::Incomplete);
}
} else {
Ok(Async::Ready((read_some, disconnected))) => {
if read_some {
self.parse();
}
if disconnected {
// delay disconnect until all tasks have finished.
self.flags.insert(Flags::READ_DISCONNECTED);
if self.tasks.is_empty() {
self.client_disconnect();
}
}
}
Ok(Async::NotReady) => (),
Err(_) => {
// notify all tasks
self.notify_disconnect();
// kill keepalive
self.keepalive_timer.take();
// on parse error, stop reading stream but tasks need to be
// completed
self.flags.insert(Flags::ERROR);
if let Some(mut payload) = self.payload.take() {
payload.set_error(PayloadError::Incomplete);
}
self.client_disconnect();
}
}
}
@@ -233,7 +248,10 @@ where
let mut idx = 0;
while idx < self.tasks.len() {
// only one task can do io operation in http/1
if !io && !self.tasks[idx].flags.contains(EntryFlags::EOF) {
if !io
&& !self.tasks[idx].flags.contains(EntryFlags::EOF)
&& !self.flags.contains(Flags::WRITE_DISCONNECTED)
{
// io is corrupted, send buffer
if self.tasks[idx].flags.contains(EntryFlags::ERROR) {
if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
@@ -297,7 +315,6 @@ where
}
// cleanup finished tasks
let max = self.tasks.len() >= MAX_PIPELINED_MESSAGES;
while !self.tasks.is_empty() {
if self.tasks[0]
.flags
@@ -308,10 +325,6 @@ where
break;
}
}
// read more message
if max && self.tasks.len() >= MAX_PIPELINED_MESSAGES {
return Ok(Async::Ready(true));
}
// check stream state
if self.flags.contains(Flags::STARTED) {
@@ -331,8 +344,12 @@ where
}
}
// deal with keep-alive
// deal with keep-alive and steam eof (client-side write shutdown)
if self.tasks.is_empty() {
// handle stream eof
if self.flags.contains(Flags::READ_DISCONNECTED) {
return Ok(Async::Ready(false));
}
// no keep-alive
if self.flags.contains(Flags::ERROR)
|| (!self.flags.contains(Flags::KEEPALIVE)
@@ -347,7 +364,7 @@ where
if self.keepalive_timer.is_none() && keep_alive > 0 {
trace!("Start keep-alive timer");
let mut timer =
Delay::new(Instant::now() + Duration::new(keep_alive, 0));
Delay::new(Instant::now() + Duration::from_secs(keep_alive));
// register timer
let _ = timer.poll();
self.keepalive_timer = Some(timer);
@@ -448,7 +465,14 @@ where
break;
}
}
Ok(None) => break,
Ok(None) => {
if self.flags.contains(Flags::READ_DISCONNECTED)
&& self.tasks.is_empty()
{
self.client_disconnect();
}
break;
}
Err(e) => {
self.flags.insert(Flags::ERROR);
if let Some(mut payload) = self.payload.take() {
@@ -603,24 +627,36 @@ mod tests {
}
#[test]
fn test_req_parse() {
fn test_req_parse1() {
let buf = Buffer::new("GET /test HTTP/1.1\r\n\r\n");
let readbuf = BytesMut::new();
let settings = Rc::new(wrk_settings());
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf);
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, false, None);
h1.poll_io();
h1.poll_io();
assert_eq!(h1.tasks.len(), 1);
}
#[test]
fn test_req_parse2() {
let buf = Buffer::new("");
let readbuf =
BytesMut::from(Vec::<u8>::from(&b"GET /test HTTP/1.1\r\n\r\n"[..]));
let settings = Rc::new(wrk_settings());
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, true, None);
h1.poll_io();
assert_eq!(h1.tasks.len(), 1);
}
#[test]
fn test_req_parse_err() {
let buf = Buffer::new("GET /test HTTP/1\r\n\r\n");
let readbuf = BytesMut::new();
let settings = Rc::new(wrk_settings());
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf);
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, false, None);
h1.poll_io();
h1.poll_io();
assert!(h1.flags.contains(Flags::ERROR));

View File

@@ -63,7 +63,9 @@ impl<T: AsyncWrite, H: 'static> H1Writer<T, H> {
self.flags = Flags::KEEPALIVE;
}
pub fn disconnected(&mut self) {}
pub fn disconnected(&mut self) {
self.flags.insert(Flags::DISCONNECTED);
}
pub fn keepalive(&self) -> bool {
self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE)
@@ -268,10 +270,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
let pl: &[u8] = payload.as_ref();
let n = match Self::write_data(&mut self.stream, pl) {
Err(err) => {
if err.kind() == io::ErrorKind::WriteZero {
self.disconnected();
}
self.disconnected();
return Err(err);
}
Ok(val) => val,
@@ -315,14 +314,15 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
#[inline]
fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error> {
if self.flags.contains(Flags::DISCONNECTED) {
return Err(io::Error::new(io::ErrorKind::Other, "disconnected"));
}
if !self.buffer.is_empty() {
let written = {
match Self::write_data(&mut self.stream, self.buffer.as_ref().as_ref()) {
Err(err) => {
if err.kind() == io::ErrorKind::WriteZero {
self.disconnected();
}
self.disconnected();
return Err(err);
}
Ok(val) => val,
@@ -339,7 +339,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
self.stream.poll_flush()?;
self.stream.shutdown()
} else {
self.stream.poll_flush()
Ok(self.stream.poll_flush()?)
}
}
}

View File

@@ -59,6 +59,7 @@ where
{
pub fn new(
settings: Rc<WorkerSettings<H>>, io: T, addr: Option<SocketAddr>, buf: Bytes,
keepalive_timer: Option<Delay>,
) -> Self {
let extensions = io.extensions();
Http2 {
@@ -68,10 +69,10 @@ where
unread: if buf.is_empty() { None } else { Some(buf) },
inner: io,
})),
keepalive_timer: None,
addr,
settings,
extensions,
keepalive_timer,
}
}

View File

@@ -8,7 +8,6 @@ use actix::{Actor, Addr, Arbiter, AsyncContext, Context, Handler, System};
use futures::{Future, Stream};
use net2::{TcpBuilder, TcpStreamExt};
use num_cpus;
use tokio::executor::current_thread;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_tcp::TcpStream;
@@ -71,9 +70,9 @@ where
factory: Arc::new(f),
host: None,
backlog: 2048,
keep_alive: KeepAlive::Os,
keep_alive: KeepAlive::Timeout(5),
shutdown_timeout: 30,
exit: true,
exit: false,
no_http2: false,
no_signals: false,
maxconn: 102_400,
@@ -132,7 +131,7 @@ where
/// Set server keep-alive setting.
///
/// By default keep alive is set to a `Os`.
/// By default keep alive is set to a 5 seconds.
pub fn keep_alive<T: Into<KeepAlive>>(mut self, val: T) -> Self {
self.keep_alive = val.into();
self
@@ -637,7 +636,9 @@ where
fn shutdown(&self, force: bool) {
if force {
self.settings.head().traverse::<TcpStream, H>();
self.settings
.head()
.traverse(|ch: &mut HttpChannel<TcpStream, H>| ch.shutdown());
}
}
}
@@ -693,7 +694,7 @@ where
};
let _ = io.set_nodelay(true);
current_thread::spawn(HttpChannel::new(h, io, peer));
Arbiter::spawn(HttpChannel::new(h, io, peer));
}
}
@@ -753,10 +754,10 @@ where
let _ = io.set_nodelay(true);
let rate = h.connection_rate();
current_thread::spawn(self.acceptor.accept(io).then(move |res| {
Arbiter::spawn(self.acceptor.accept(io).then(move |res| {
drop(rate);
match res {
Ok(io) => current_thread::spawn(HttpChannel::new(h, io, peer)),
Ok(io) => Arbiter::spawn(HttpChannel::new(h, io, peer)),
Err(err) => trace!("Can not establish connection: {}", err),
}
Ok(())

View File

@@ -390,7 +390,7 @@ pub trait IoStream: AsyncRead + AsyncWrite + 'static {
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()>;
fn read_available(&mut self, buf: &mut BytesMut) -> Poll<bool, io::Error> {
fn read_available(&mut self, buf: &mut BytesMut) -> Poll<(bool, bool), io::Error> {
let mut read_some = false;
loop {
if buf.remaining_mut() < LW_BUFFER_SIZE {
@@ -400,7 +400,7 @@ pub trait IoStream: AsyncRead + AsyncWrite + 'static {
match self.read(buf.bytes_mut()) {
Ok(n) => {
if n == 0 {
return Ok(Async::Ready(!read_some));
return Ok(Async::Ready((read_some, true)));
} else {
read_some = true;
buf.advance_mut(n);
@@ -409,7 +409,7 @@ pub trait IoStream: AsyncRead + AsyncWrite + 'static {
Err(e) => {
return if e.kind() == io::ErrorKind::WouldBlock {
if read_some {
Ok(Async::Ready(false))
Ok(Async::Ready((read_some, false)))
} else {
Ok(Async::NotReady)
}

View File

@@ -217,7 +217,7 @@ impl Server {
// start accept thread
for sock in &self.sockets {
for s in sock.iter() {
info!("Starting server on http://{:?}", s.1.local_addr().ok());
info!("Starting server on http://{}", s.1.local_addr().unwrap());
}
}
let rx = self

View File

@@ -13,7 +13,7 @@ use http::StatusCode;
use lazycell::LazyCell;
use parking_lot::Mutex;
use time;
use tokio_timer::Interval;
use tokio_timer::{Delay, Interval};
use super::channel::Node;
use super::message::{Request, RequestPool};
@@ -197,6 +197,16 @@ impl<H> WorkerSettings<H> {
&self.h
}
pub fn keep_alive_timer(&self) -> Option<Delay> {
if self.keep_alive != 0 {
Some(Delay::new(
Instant::now() + Duration::from_secs(self.keep_alive),
))
} else {
None
}
}
pub fn keep_alive(&self) -> u64 {
self.keep_alive
}

View File

@@ -120,6 +120,7 @@ impl TestServer {
HttpServer::new(factory)
.disable_signals()
.listen(tcp)
.keep_alive(5)
.start();
tx.send((System::current(), local_addr, TestServer::get_conn()))
@@ -328,6 +329,7 @@ impl<S: 'static> TestServerBuilder<S> {
config(&mut app);
vec![app]
}).workers(1)
.keep_alive(5)
.disable_signals();
tx.send((System::current(), addr, TestServer::get_conn()))

View File

@@ -8,7 +8,8 @@ extern crate rand;
#[cfg(all(unix, feature = "uds"))]
extern crate tokio_uds;
use std::io::Read;
use std::io::{Read, Write};
use std::{net, thread};
use bytes::Bytes;
use flate2::read::GzDecoder;
@@ -66,6 +67,16 @@ fn test_simple() {
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
}
#[test]
fn test_connection_close() {
let mut srv =
test::TestServer::new(|app| app.handler(|_| HttpResponse::Ok().body(STR)));
let request = srv.get().header("Connection", "close").finish().unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
}
#[test]
fn test_with_query_parameter() {
let mut srv = test::TestServer::new(|app| {
@@ -396,24 +407,29 @@ fn test_client_cookie_handling() {
let cookie2 = cookie2b.clone();
app.handler(move |req: &HttpRequest| {
// Check cookies were sent correctly
req.cookie("cookie1").ok_or_else(err)
.and_then(|c1| if c1.value() == "value1" {
req.cookie("cookie1")
.ok_or_else(err)
.and_then(|c1| {
if c1.value() == "value1" {
Ok(())
} else {
Err(err())
})
.and_then(|()| req.cookie("cookie2").ok_or_else(err))
.and_then(|c2| if c2.value() == "value2" {
}
}).and_then(|()| req.cookie("cookie2").ok_or_else(err))
.and_then(|c2| {
if c2.value() == "value2" {
Ok(())
} else {
Err(err())
})
// Send some cookies back
.map(|_| HttpResponse::Ok()
.cookie(cookie1.clone())
.cookie(cookie2.clone())
.finish()
)
}
})
// Send some cookies back
.map(|_| {
HttpResponse::Ok()
.cookie(cookie1.clone())
.cookie(cookie2.clone())
.finish()
})
})
});
@@ -460,3 +476,33 @@ fn test_default_headers() {
"\""
)));
}
#[test]
fn client_read_until_eof() {
let addr = test::TestServer::unused_addr();
thread::spawn(move || {
let lst = net::TcpListener::bind(addr).unwrap();
for stream in lst.incoming() {
let mut stream = stream.unwrap();
let mut b = [0; 1000];
let _ = stream.read(&mut b).unwrap();
let _ = stream
.write_all(b"HTTP/1.1 200 OK\r\nconnection: close\r\n\r\nwelcome!");
}
});
let mut sys = actix::System::new("test");
// client request
let req = client::ClientRequest::get(format!("http://{}/", addr).as_str())
.finish()
.unwrap();
let response = sys.block_on(req.send()).unwrap();
assert!(response.status().is_success());
// read response
let bytes = sys.block_on(response.body()).unwrap();
assert_eq!(bytes, Bytes::from_static(b"welcome!"));
}

View File

@@ -932,6 +932,28 @@ fn test_application() {
assert!(response.status().is_success());
}
#[test]
fn test_default_404_handler_response() {
let mut srv = test::TestServer::with_factory(|| {
App::new()
.prefix("/app")
.resource("", |r| r.f(|_| HttpResponse::Ok()))
.resource("/", |r| r.f(|_| HttpResponse::Ok()))
});
let addr = srv.addr();
let mut buf = [0; 24];
let request = TcpStream::connect(&addr)
.and_then(|sock| {
tokio::io::write_all(sock, "HEAD / HTTP/1.1\r\nHost: localhost\r\n\r\n")
.and_then(|(sock, _)| tokio::io::read_exact(sock, &mut buf))
.and_then(|(_, buf)| Ok(buf))
}).map_err(|e| panic!("{:?}", e));
let response = srv.execute(request).unwrap();
let rep = String::from_utf8_lossy(&response[..]);
assert!(rep.contains("HTTP/1.1 404 Not Found"));
}
#[test]
fn test_server_cookies() {
use actix_web::http;