mirror of
https://github.com/fafhrd91/actix-web
synced 2025-07-16 22:55:47 +02:00
Compare commits
51 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
f40153fca4 | ||
|
764103566d | ||
|
bfb2f2e9e1 | ||
|
599e6b3385 | ||
|
03e318f446 | ||
|
7449884ce3 | ||
|
bbe69e5b8d | ||
|
9d1eefc38f | ||
|
d65c72b44d | ||
|
c3f8b5cf22 | ||
|
70a3f317d3 | ||
|
513c8ec1ce | ||
|
04608b2ea6 | ||
|
70b45659e2 | ||
|
e0ae6b10cd | ||
|
003b05b095 | ||
|
cdb57b840e | ||
|
002bb24b26 | ||
|
51982b3fec | ||
|
4251b0bc10 | ||
|
42f3773bec | ||
|
86fdbb47a5 | ||
|
4ca9fd2ad1 | ||
|
f0f67072ae | ||
|
24d1228943 | ||
|
b7a73e0a4f | ||
|
968c81e267 | ||
|
d5957a8466 | ||
|
f2f05e7715 | ||
|
3439f55288 | ||
|
0425e2776f | ||
|
6464f96f8b | ||
|
a2b170fec9 | ||
|
0b42cae082 | ||
|
c313c003a4 | ||
|
3fa23f5e10 | ||
|
2d51831899 | ||
|
e59abfd716 | ||
|
66881d7dd1 | ||
|
a42a8a2321 | ||
|
2341656173 | ||
|
487519acec | ||
|
af6caa92c8 | ||
|
3ccbce6bc8 | ||
|
797b52ecbf | ||
|
4bab50c861 | ||
|
5906971b6d | ||
|
8393d09a0f | ||
|
c3ae9997fc | ||
|
d39dcc58cd | ||
|
471a3e9806 |
58
CHANGES.md
58
CHANGES.md
@@ -1,5 +1,63 @@
|
||||
# Changes
|
||||
|
||||
## [0.7.8] - 2018-09-17
|
||||
|
||||
### Added
|
||||
|
||||
* Use server `Keep-Alive` setting as slow request timeout #439
|
||||
|
||||
### Changed
|
||||
|
||||
* Use 5 seconds keep-alive timer by default.
|
||||
|
||||
### Fixed
|
||||
|
||||
* Fixed wrong error message for i16 type #510
|
||||
|
||||
|
||||
## [0.7.7] - 2018-09-11
|
||||
|
||||
### Fixed
|
||||
|
||||
* Fix linked list of HttpChannels #504
|
||||
|
||||
* Fix requests to TestServer fail #508
|
||||
|
||||
|
||||
## [0.7.6] - 2018-09-07
|
||||
|
||||
### Fixed
|
||||
|
||||
* Fix system_exit in HttpServer #501
|
||||
|
||||
* Fix parsing of route param containin regexes with repetition #500
|
||||
|
||||
### Changes
|
||||
|
||||
* Unhide `SessionBackend` and `SessionImpl` traits #455
|
||||
|
||||
|
||||
## [0.7.5] - 2018-09-04
|
||||
|
||||
### Added
|
||||
|
||||
* Added the ability to pass a custom `TlsConnector`.
|
||||
|
||||
* Allow to register handlers on scope level #465
|
||||
|
||||
|
||||
### Fixed
|
||||
|
||||
* Handle socket read disconnect
|
||||
|
||||
* Handling scoped paths without leading slashes #460
|
||||
|
||||
|
||||
### Changed
|
||||
|
||||
* Read client response until eof if connection header set to close #464
|
||||
|
||||
|
||||
## [0.7.4] - 2018-08-23
|
||||
|
||||
### Added
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-web"
|
||||
version = "0.7.4"
|
||||
version = "0.7.8"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix web is a simple, pragmatic and extremely fast web framework for Rust."
|
||||
readme = "README.md"
|
||||
@@ -101,6 +101,7 @@ tokio-io = "0.1"
|
||||
tokio-tcp = "0.1"
|
||||
tokio-timer = "0.2"
|
||||
tokio-reactor = "0.1"
|
||||
tokio-current-thread = "0.1"
|
||||
|
||||
# native-tls
|
||||
native-tls = { version="0.2", optional = true }
|
||||
|
@@ -447,11 +447,8 @@ where
|
||||
{
|
||||
let mut path = path.trim().trim_right_matches('/').to_owned();
|
||||
if !path.is_empty() && !path.starts_with('/') {
|
||||
path.insert(0, '/')
|
||||
}
|
||||
if path.len() > 1 && path.ends_with('/') {
|
||||
path.pop();
|
||||
}
|
||||
path.insert(0, '/');
|
||||
};
|
||||
self.parts
|
||||
.as_mut()
|
||||
.expect("Use after finish")
|
||||
|
@@ -17,57 +17,34 @@ use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_timer::Delay;
|
||||
|
||||
#[cfg(feature = "alpn")]
|
||||
use openssl::ssl::{Error as OpensslError, SslConnector, SslMethod};
|
||||
#[cfg(feature = "alpn")]
|
||||
use tokio_openssl::SslConnectorExt;
|
||||
use {
|
||||
openssl::ssl::{Error as SslError, SslConnector, SslMethod},
|
||||
tokio_openssl::SslConnectorExt,
|
||||
};
|
||||
|
||||
#[cfg(all(feature = "tls", not(feature = "alpn")))]
|
||||
use native_tls::{Error as TlsError, TlsConnector as NativeTlsConnector};
|
||||
#[cfg(all(feature = "tls", not(feature = "alpn")))]
|
||||
use tokio_tls::{TlsConnector};
|
||||
use {
|
||||
native_tls::{Error as SslError, TlsConnector as NativeTlsConnector},
|
||||
tokio_tls::TlsConnector as SslConnector,
|
||||
};
|
||||
|
||||
#[cfg(
|
||||
all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
)
|
||||
)]
|
||||
use rustls::ClientConfig;
|
||||
#[cfg(
|
||||
all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
)
|
||||
)]
|
||||
use std::io::Error as TLSError;
|
||||
#[cfg(
|
||||
all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
)
|
||||
)]
|
||||
use std::sync::Arc;
|
||||
#[cfg(
|
||||
all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
)
|
||||
)]
|
||||
use tokio_rustls::ClientConfigExt;
|
||||
#[cfg(
|
||||
all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
)
|
||||
)]
|
||||
use webpki::DNSNameRef;
|
||||
#[cfg(
|
||||
all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
)
|
||||
)]
|
||||
use webpki_roots;
|
||||
#[cfg(all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
))]
|
||||
use {
|
||||
rustls::ClientConfig, std::io::Error as SslError, std::sync::Arc,
|
||||
tokio_rustls::ClientConfigExt, webpki::DNSNameRef, webpki_roots,
|
||||
};
|
||||
|
||||
#[cfg(all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
))]
|
||||
type SslConnector = Arc<ClientConfig>;
|
||||
|
||||
#[cfg(not(any(feature = "alpn", feature = "tls", feature = "rust-tls")))]
|
||||
type SslConnector = ();
|
||||
|
||||
use server::IoStream;
|
||||
use {HAS_OPENSSL, HAS_RUSTLS, HAS_TLS};
|
||||
@@ -173,24 +150,9 @@ pub enum ClientConnectorError {
|
||||
SslIsNotSupported,
|
||||
|
||||
/// SSL error
|
||||
#[cfg(feature = "alpn")]
|
||||
#[cfg(any(feature = "tls", feature = "alpn", feature = "rust-tls"))]
|
||||
#[fail(display = "{}", _0)]
|
||||
SslError(#[cause] OpensslError),
|
||||
|
||||
/// SSL error
|
||||
#[cfg(all(feature = "tls", not(feature = "alpn")))]
|
||||
#[fail(display = "{}", _0)]
|
||||
SslError(#[cause] TlsError),
|
||||
|
||||
/// SSL error
|
||||
#[cfg(
|
||||
all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
)
|
||||
)]
|
||||
#[fail(display = "{}", _0)]
|
||||
SslError(#[cause] TLSError),
|
||||
SslError(#[cause] SslError),
|
||||
|
||||
/// Resolver error
|
||||
#[fail(display = "{}", _0)]
|
||||
@@ -242,17 +204,8 @@ impl Paused {
|
||||
/// `ClientConnector` type is responsible for transport layer of a
|
||||
/// client connection.
|
||||
pub struct ClientConnector {
|
||||
#[cfg(all(feature = "alpn"))]
|
||||
#[allow(dead_code)]
|
||||
connector: SslConnector,
|
||||
#[cfg(all(feature = "tls", not(feature = "alpn")))]
|
||||
connector: TlsConnector,
|
||||
#[cfg(
|
||||
all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
)
|
||||
)]
|
||||
connector: Arc<ClientConfig>,
|
||||
|
||||
stats: ClientConnectorStats,
|
||||
subscriber: Option<Recipient<ClientConnectorStats>>,
|
||||
@@ -293,71 +246,36 @@ impl SystemService for ClientConnector {}
|
||||
|
||||
impl Default for ClientConnector {
|
||||
fn default() -> ClientConnector {
|
||||
#[cfg(all(feature = "alpn"))]
|
||||
{
|
||||
let builder = SslConnector::builder(SslMethod::tls()).unwrap();
|
||||
ClientConnector::with_connector(builder.build())
|
||||
}
|
||||
#[cfg(all(feature = "tls", not(feature = "alpn")))]
|
||||
{
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
let builder = NativeTlsConnector::builder();
|
||||
ClientConnector {
|
||||
stats: ClientConnectorStats::default(),
|
||||
subscriber: None,
|
||||
acq_tx: tx,
|
||||
acq_rx: Some(rx),
|
||||
resolver: None,
|
||||
connector: builder.build().unwrap().into(),
|
||||
conn_lifetime: Duration::from_secs(75),
|
||||
conn_keep_alive: Duration::from_secs(15),
|
||||
limit: 100,
|
||||
limit_per_host: 0,
|
||||
acquired: 0,
|
||||
acquired_per_host: HashMap::new(),
|
||||
available: HashMap::new(),
|
||||
to_close: Vec::new(),
|
||||
waiters: Some(HashMap::new()),
|
||||
wait_timeout: None,
|
||||
paused: Paused::No,
|
||||
let connector = {
|
||||
#[cfg(all(feature = "alpn"))]
|
||||
{
|
||||
SslConnector::builder(SslMethod::tls()).unwrap().build()
|
||||
}
|
||||
}
|
||||
#[cfg(
|
||||
all(
|
||||
|
||||
#[cfg(all(feature = "tls", not(feature = "alpn")))]
|
||||
{
|
||||
NativeTlsConnector::builder().build().unwrap().into()
|
||||
}
|
||||
|
||||
#[cfg(all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
)
|
||||
)]
|
||||
{
|
||||
let mut config = ClientConfig::new();
|
||||
config
|
||||
.root_store
|
||||
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
|
||||
ClientConnector::with_connector(config)
|
||||
}
|
||||
|
||||
#[cfg(not(any(feature = "alpn", feature = "tls", feature = "rust-tls")))]
|
||||
{
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
ClientConnector {
|
||||
stats: ClientConnectorStats::default(),
|
||||
subscriber: None,
|
||||
acq_tx: tx,
|
||||
acq_rx: Some(rx),
|
||||
resolver: None,
|
||||
conn_lifetime: Duration::from_secs(75),
|
||||
conn_keep_alive: Duration::from_secs(15),
|
||||
limit: 100,
|
||||
limit_per_host: 0,
|
||||
acquired: 0,
|
||||
acquired_per_host: HashMap::new(),
|
||||
available: HashMap::new(),
|
||||
to_close: Vec::new(),
|
||||
waiters: Some(HashMap::new()),
|
||||
wait_timeout: None,
|
||||
paused: Paused::No,
|
||||
))]
|
||||
{
|
||||
let mut config = ClientConfig::new();
|
||||
config
|
||||
.root_store
|
||||
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
|
||||
Arc::new(config)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(any(feature = "alpn", feature = "tls", feature = "rust-tls")))]
|
||||
{
|
||||
()
|
||||
}
|
||||
};
|
||||
|
||||
ClientConnector::with_connector_impl(connector)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -375,7 +293,6 @@ impl ClientConnector {
|
||||
/// # extern crate futures;
|
||||
/// # use futures::{future, Future};
|
||||
/// # use std::io::Write;
|
||||
/// # use std::process;
|
||||
/// # use actix_web::actix::Actor;
|
||||
/// extern crate openssl;
|
||||
/// use actix_web::{actix, client::ClientConnector, client::Connect};
|
||||
@@ -402,35 +319,14 @@ impl ClientConnector {
|
||||
/// }
|
||||
/// ```
|
||||
pub fn with_connector(connector: SslConnector) -> ClientConnector {
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
|
||||
ClientConnector {
|
||||
connector,
|
||||
stats: ClientConnectorStats::default(),
|
||||
subscriber: None,
|
||||
acq_tx: tx,
|
||||
acq_rx: Some(rx),
|
||||
resolver: None,
|
||||
conn_lifetime: Duration::from_secs(75),
|
||||
conn_keep_alive: Duration::from_secs(15),
|
||||
limit: 100,
|
||||
limit_per_host: 0,
|
||||
acquired: 0,
|
||||
acquired_per_host: HashMap::new(),
|
||||
available: HashMap::new(),
|
||||
to_close: Vec::new(),
|
||||
waiters: Some(HashMap::new()),
|
||||
wait_timeout: None,
|
||||
paused: Paused::No,
|
||||
}
|
||||
// keep level of indirection for docstrings matching featureflags
|
||||
Self::with_connector_impl(connector)
|
||||
}
|
||||
|
||||
#[cfg(
|
||||
all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
)
|
||||
)]
|
||||
#[cfg(all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
))]
|
||||
/// Create `ClientConnector` actor with custom `SslConnector` instance.
|
||||
///
|
||||
/// By default `ClientConnector` uses very a simple SSL configuration.
|
||||
@@ -441,10 +337,8 @@ impl ClientConnector {
|
||||
/// # #![cfg(feature = "rust-tls")]
|
||||
/// # extern crate actix_web;
|
||||
/// # extern crate futures;
|
||||
/// # extern crate tokio;
|
||||
/// # use futures::{future, Future};
|
||||
/// # use std::io::Write;
|
||||
/// # use std::process;
|
||||
/// # use actix_web::actix::Actor;
|
||||
/// extern crate rustls;
|
||||
/// extern crate webpki_roots;
|
||||
@@ -476,10 +370,61 @@ impl ClientConnector {
|
||||
/// }
|
||||
/// ```
|
||||
pub fn with_connector(connector: ClientConfig) -> ClientConnector {
|
||||
// keep level of indirection for docstrings matching featureflags
|
||||
Self::with_connector_impl(Arc::new(connector))
|
||||
}
|
||||
|
||||
#[cfg(all(
|
||||
feature = "tls",
|
||||
not(any(feature = "alpn", feature = "rust-tls"))
|
||||
))]
|
||||
/// Create `ClientConnector` actor with custom `SslConnector` instance.
|
||||
///
|
||||
/// By default `ClientConnector` uses very a simple SSL configuration.
|
||||
/// With `with_connector` method it is possible to use a custom
|
||||
/// `SslConnector` object.
|
||||
///
|
||||
/// ```rust
|
||||
/// # #![cfg(feature = "tls")]
|
||||
/// # extern crate actix_web;
|
||||
/// # extern crate futures;
|
||||
/// # use futures::{future, Future};
|
||||
/// # use std::io::Write;
|
||||
/// # use actix_web::actix::Actor;
|
||||
/// extern crate native_tls;
|
||||
/// extern crate webpki_roots;
|
||||
/// use native_tls::TlsConnector;
|
||||
/// use actix_web::{actix, client::ClientConnector, client::Connect};
|
||||
///
|
||||
/// fn main() {
|
||||
/// actix::run(|| {
|
||||
/// let connector = TlsConnector::new().unwrap();
|
||||
/// let conn = ClientConnector::with_connector(connector.into()).start();
|
||||
///
|
||||
/// conn.send(
|
||||
/// Connect::new("https://www.rust-lang.org").unwrap()) // <- connect to host
|
||||
/// .map_err(|_| ())
|
||||
/// .and_then(|res| {
|
||||
/// if let Ok(mut stream) = res {
|
||||
/// stream.write_all(b"GET / HTTP/1.0\r\n\r\n").unwrap();
|
||||
/// }
|
||||
/// # actix::System::current().stop();
|
||||
/// Ok(())
|
||||
/// })
|
||||
/// });
|
||||
/// }
|
||||
/// ```
|
||||
pub fn with_connector(connector: SslConnector) -> ClientConnector {
|
||||
// keep level of indirection for docstrings matching featureflags
|
||||
Self::with_connector_impl(connector)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn with_connector_impl(connector: SslConnector) -> ClientConnector {
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
|
||||
ClientConnector {
|
||||
connector: Arc::new(connector),
|
||||
connector,
|
||||
stats: ClientConnectorStats::default(),
|
||||
subscriber: None,
|
||||
acq_tx: tx,
|
||||
@@ -853,12 +798,10 @@ impl ClientConnector {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(
|
||||
all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
)
|
||||
)]
|
||||
#[cfg(all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
))]
|
||||
match res {
|
||||
Err(err) => {
|
||||
let _ = waiter.tx.send(Err(err.into()));
|
||||
@@ -1344,7 +1287,7 @@ impl AsyncWrite for Connection {
|
||||
}
|
||||
|
||||
#[cfg(feature = "tls")]
|
||||
use tokio_tls::{TlsStream};
|
||||
use tokio_tls::TlsStream;
|
||||
|
||||
#[cfg(feature = "tls")]
|
||||
/// This is temp solution untile actix-net migration
|
||||
@@ -1364,4 +1307,4 @@ impl<Io: IoStream> IoStream for TlsStream<Io> {
|
||||
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
|
||||
self.get_mut().get_mut().set_linger(dur)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -20,6 +20,7 @@ const MAX_HEADERS: usize = 96;
|
||||
#[derive(Default)]
|
||||
pub struct HttpResponseParser {
|
||||
decoder: Option<EncodingDecoder>,
|
||||
eof: bool, // indicate that we read payload until stream eof
|
||||
}
|
||||
|
||||
#[derive(Debug, Fail)]
|
||||
@@ -38,43 +39,42 @@ impl HttpResponseParser {
|
||||
where
|
||||
T: IoStream,
|
||||
{
|
||||
// if buf is empty parse_message will always return NotReady, let's avoid that
|
||||
if buf.is_empty() {
|
||||
loop {
|
||||
// Don't call parser until we have data to parse.
|
||||
if !buf.is_empty() {
|
||||
match HttpResponseParser::parse_message(buf)
|
||||
.map_err(HttpResponseParserError::Error)?
|
||||
{
|
||||
Async::Ready((msg, info)) => {
|
||||
if let Some((decoder, eof)) = info {
|
||||
self.eof = eof;
|
||||
self.decoder = Some(decoder);
|
||||
} else {
|
||||
self.eof = false;
|
||||
self.decoder = None;
|
||||
}
|
||||
return Ok(Async::Ready(msg));
|
||||
}
|
||||
Async::NotReady => {
|
||||
if buf.capacity() >= MAX_BUFFER_SIZE {
|
||||
return Err(HttpResponseParserError::Error(
|
||||
ParseError::TooLarge,
|
||||
));
|
||||
}
|
||||
// Parser needs more data.
|
||||
}
|
||||
}
|
||||
}
|
||||
// Read some more data into the buffer for the parser.
|
||||
match io.read_available(buf) {
|
||||
Ok(Async::Ready(true)) => {
|
||||
Ok(Async::Ready((false, true))) => {
|
||||
return Err(HttpResponseParserError::Disconnect)
|
||||
}
|
||||
Ok(Async::Ready(false)) => (),
|
||||
Ok(Async::Ready(_)) => (),
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Err(err) => return Err(HttpResponseParserError::Error(err.into())),
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
match HttpResponseParser::parse_message(buf)
|
||||
.map_err(HttpResponseParserError::Error)?
|
||||
{
|
||||
Async::Ready((msg, decoder)) => {
|
||||
self.decoder = decoder;
|
||||
return Ok(Async::Ready(msg));
|
||||
}
|
||||
Async::NotReady => {
|
||||
if buf.capacity() >= MAX_BUFFER_SIZE {
|
||||
return Err(HttpResponseParserError::Error(ParseError::TooLarge));
|
||||
}
|
||||
match io.read_available(buf) {
|
||||
Ok(Async::Ready(true)) => {
|
||||
return Err(HttpResponseParserError::Disconnect)
|
||||
}
|
||||
Ok(Async::Ready(false)) => (),
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Err(err) => {
|
||||
return Err(HttpResponseParserError::Error(err.into()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn parse_payload<T>(
|
||||
@@ -87,8 +87,8 @@ impl HttpResponseParser {
|
||||
loop {
|
||||
// read payload
|
||||
let (not_ready, stream_finished) = match io.read_available(buf) {
|
||||
Ok(Async::Ready(true)) => (false, true),
|
||||
Ok(Async::Ready(false)) => (false, false),
|
||||
Ok(Async::Ready((_, true))) => (false, true),
|
||||
Ok(Async::Ready((_, false))) => (false, false),
|
||||
Ok(Async::NotReady) => (true, false),
|
||||
Err(err) => return Err(err.into()),
|
||||
};
|
||||
@@ -104,7 +104,12 @@ impl HttpResponseParser {
|
||||
return Ok(Async::NotReady);
|
||||
}
|
||||
if stream_finished {
|
||||
return Err(PayloadError::Incomplete);
|
||||
// read untile eof?
|
||||
if self.eof {
|
||||
return Ok(Async::Ready(None));
|
||||
} else {
|
||||
return Err(PayloadError::Incomplete);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => return Err(err.into()),
|
||||
@@ -117,7 +122,7 @@ impl HttpResponseParser {
|
||||
|
||||
fn parse_message(
|
||||
buf: &mut BytesMut,
|
||||
) -> Poll<(ClientResponse, Option<EncodingDecoder>), ParseError> {
|
||||
) -> Poll<(ClientResponse, Option<(EncodingDecoder, bool)>), ParseError> {
|
||||
// Unsafe: we read only this data only after httparse parses headers into.
|
||||
// performance bump for pipeline benchmarks.
|
||||
let mut headers: [HeaderIndex; MAX_HEADERS] = unsafe { mem::uninitialized() };
|
||||
@@ -163,12 +168,12 @@ impl HttpResponseParser {
|
||||
}
|
||||
|
||||
let decoder = if status == StatusCode::SWITCHING_PROTOCOLS {
|
||||
Some(EncodingDecoder::eof())
|
||||
Some((EncodingDecoder::eof(), true))
|
||||
} else if let Some(len) = hdrs.get(header::CONTENT_LENGTH) {
|
||||
// Content-Length
|
||||
if let Ok(s) = len.to_str() {
|
||||
if let Ok(len) = s.parse::<u64>() {
|
||||
Some(EncodingDecoder::length(len))
|
||||
Some((EncodingDecoder::length(len), false))
|
||||
} else {
|
||||
debug!("illegal Content-Length: {:?}", len);
|
||||
return Err(ParseError::Header);
|
||||
@@ -179,7 +184,18 @@ impl HttpResponseParser {
|
||||
}
|
||||
} else if chunked(&hdrs)? {
|
||||
// Chunked encoding
|
||||
Some(EncodingDecoder::chunked())
|
||||
Some((EncodingDecoder::chunked(), false))
|
||||
} else if let Some(value) = hdrs.get(header::CONNECTION) {
|
||||
let close = if let Ok(s) = value.to_str() {
|
||||
s == "close"
|
||||
} else {
|
||||
false
|
||||
};
|
||||
if close {
|
||||
Some((EncodingDecoder::eof(), true))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
@@ -254,16 +254,16 @@ impl ClientRequest {
|
||||
|
||||
impl fmt::Debug for ClientRequest {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let res = writeln!(
|
||||
writeln!(
|
||||
f,
|
||||
"\nClientRequest {:?} {}:{}",
|
||||
self.version, self.method, self.uri
|
||||
);
|
||||
let _ = writeln!(f, " headers:");
|
||||
)?;
|
||||
writeln!(f, " headers:")?;
|
||||
for (key, val) in self.headers.iter() {
|
||||
let _ = writeln!(f, " {:?}: {:?}", key, val);
|
||||
writeln!(f, " {:?}: {:?}", key, val)?;
|
||||
}
|
||||
res
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -750,16 +750,16 @@ fn parts<'a>(
|
||||
impl fmt::Debug for ClientRequestBuilder {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
if let Some(ref parts) = self.request {
|
||||
let res = writeln!(
|
||||
writeln!(
|
||||
f,
|
||||
"\nClientRequestBuilder {:?} {}:{}",
|
||||
parts.version, parts.method, parts.uri
|
||||
);
|
||||
let _ = writeln!(f, " headers:");
|
||||
)?;
|
||||
writeln!(f, " headers:")?;
|
||||
for (key, val) in parts.headers.iter() {
|
||||
let _ = writeln!(f, " {:?}: {:?}", key, val);
|
||||
writeln!(f, " {:?}: {:?}", key, val)?;
|
||||
}
|
||||
res
|
||||
Ok(())
|
||||
} else {
|
||||
write!(f, "ClientRequestBuilder(Consumed)")
|
||||
}
|
||||
|
@@ -95,12 +95,12 @@ impl ClientResponse {
|
||||
|
||||
impl fmt::Debug for ClientResponse {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let res = writeln!(f, "\nClientResponse {:?} {}", self.version(), self.status());
|
||||
let _ = writeln!(f, " headers:");
|
||||
writeln!(f, "\nClientResponse {:?} {}", self.version(), self.status())?;
|
||||
writeln!(f, " headers:")?;
|
||||
for (key, val) in self.headers().iter() {
|
||||
let _ = writeln!(f, " {:?}: {:?}", key, val);
|
||||
writeln!(f, " {:?}: {:?}", key, val)?;
|
||||
}
|
||||
res
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -175,7 +175,7 @@ impl<'de, S: 'de> Deserializer<'de> for PathDeserializer<'de, S> {
|
||||
parse_single_value!(deserialize_bool, visit_bool, "bool");
|
||||
parse_single_value!(deserialize_i8, visit_i8, "i8");
|
||||
parse_single_value!(deserialize_i16, visit_i16, "i16");
|
||||
parse_single_value!(deserialize_i32, visit_i32, "i16");
|
||||
parse_single_value!(deserialize_i32, visit_i32, "i32");
|
||||
parse_single_value!(deserialize_i64, visit_i64, "i64");
|
||||
parse_single_value!(deserialize_u8, visit_u8, "u8");
|
||||
parse_single_value!(deserialize_u16, visit_u16, "u16");
|
||||
|
@@ -354,24 +354,24 @@ impl<S> FromRequest<S> for HttpRequest<S> {
|
||||
|
||||
impl<S> fmt::Debug for HttpRequest<S> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let res = writeln!(
|
||||
writeln!(
|
||||
f,
|
||||
"\nHttpRequest {:?} {}:{}",
|
||||
self.version(),
|
||||
self.method(),
|
||||
self.path()
|
||||
);
|
||||
)?;
|
||||
if !self.query_string().is_empty() {
|
||||
let _ = writeln!(f, " query: ?{:?}", self.query_string());
|
||||
writeln!(f, " query: ?{:?}", self.query_string())?;
|
||||
}
|
||||
if !self.match_info().is_empty() {
|
||||
let _ = writeln!(f, " params: {:?}", self.match_info());
|
||||
writeln!(f, " params: {:?}", self.match_info())?;
|
||||
}
|
||||
let _ = writeln!(f, " headers:");
|
||||
writeln!(f, " headers:")?;
|
||||
for (key, val) in self.headers().iter() {
|
||||
let _ = writeln!(f, " {:?}: {:?}", key, val);
|
||||
writeln!(f, " {:?}: {:?}", key, val)?;
|
||||
}
|
||||
res
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -118,6 +118,7 @@ extern crate parking_lot;
|
||||
extern crate rand;
|
||||
extern crate slab;
|
||||
extern crate tokio;
|
||||
extern crate tokio_current_thread;
|
||||
extern crate tokio_io;
|
||||
extern crate tokio_reactor;
|
||||
extern crate tokio_tcp;
|
||||
|
@@ -270,14 +270,17 @@ impl<S: 'static, T: SessionBackend<S>> Middleware<S> for SessionStorage<T, S> {
|
||||
}
|
||||
|
||||
/// A simple key-value storage interface that is internally used by `Session`.
|
||||
#[doc(hidden)]
|
||||
pub trait SessionImpl: 'static {
|
||||
/// Get session value by key
|
||||
fn get(&self, key: &str) -> Option<&str>;
|
||||
|
||||
/// Set session value
|
||||
fn set(&mut self, key: &str, value: String);
|
||||
|
||||
/// Remove specific key from session
|
||||
fn remove(&mut self, key: &str);
|
||||
|
||||
/// Remove all values from session
|
||||
fn clear(&mut self);
|
||||
|
||||
/// Write session to storage backend.
|
||||
@@ -285,9 +288,10 @@ pub trait SessionImpl: 'static {
|
||||
}
|
||||
|
||||
/// Session's storage backend trait definition.
|
||||
#[doc(hidden)]
|
||||
pub trait SessionBackend<S>: Sized + 'static {
|
||||
/// Session item
|
||||
type Session: SessionImpl;
|
||||
/// Future that reads session
|
||||
type ReadFuture: Future<Item = Self::Session, Error = Error>;
|
||||
|
||||
/// Parse the session from request and load data from a storage backend.
|
||||
|
@@ -441,13 +441,13 @@ where
|
||||
|
||||
impl<S> fmt::Debug for Field<S> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let res = writeln!(f, "\nMultipartField: {}", self.ct);
|
||||
let _ = writeln!(f, " boundary: {}", self.inner.borrow().boundary);
|
||||
let _ = writeln!(f, " headers:");
|
||||
writeln!(f, "\nMultipartField: {}", self.ct)?;
|
||||
writeln!(f, " boundary: {}", self.inner.borrow().boundary)?;
|
||||
writeln!(f, " headers:")?;
|
||||
for (key, val) in self.headers.iter() {
|
||||
let _ = writeln!(f, " {:?}: {:?}", key, val);
|
||||
writeln!(f, " {:?}: {:?}", key, val)?;
|
||||
}
|
||||
res
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -236,7 +236,6 @@ macro_rules! FROM_STR {
|
||||
($type:ty) => {
|
||||
impl FromParam for $type {
|
||||
type Err = InternalError<<$type as FromStr>::Err>;
|
||||
|
||||
fn from_param(val: &str) -> Result<Self, Self::Err> {
|
||||
<$type as FromStr>::from_str(val)
|
||||
.map_err(|e| InternalError::new(e, StatusCode::BAD_REQUEST))
|
||||
|
133
src/router.rs
133
src/router.rs
@@ -815,73 +815,70 @@ impl ResourceDef {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn parse(
|
||||
pattern: &str, for_prefix: bool,
|
||||
) -> (String, Vec<PatternElement>, bool, usize) {
|
||||
fn parse_param(pattern: &str) -> (PatternElement, String, &str) {
|
||||
const DEFAULT_PATTERN: &str = "[^/]+";
|
||||
|
||||
let mut re1 = String::from("^");
|
||||
let mut re2 = String::new();
|
||||
let mut el = String::new();
|
||||
let mut in_param = false;
|
||||
let mut in_param_pattern = false;
|
||||
let mut param_name = String::new();
|
||||
let mut param_pattern = String::from(DEFAULT_PATTERN);
|
||||
let mut is_dynamic = false;
|
||||
let mut elems = Vec::new();
|
||||
let mut len = 0;
|
||||
|
||||
for ch in pattern.chars() {
|
||||
if in_param {
|
||||
// In parameter segment: `{....}`
|
||||
if ch == '}' {
|
||||
elems.push(PatternElement::Var(param_name.clone()));
|
||||
re1.push_str(&format!(r"(?P<{}>{})", ¶m_name, ¶m_pattern));
|
||||
|
||||
param_name.clear();
|
||||
param_pattern = String::from(DEFAULT_PATTERN);
|
||||
|
||||
len = 0;
|
||||
in_param_pattern = false;
|
||||
in_param = false;
|
||||
} else if ch == ':' {
|
||||
// The parameter name has been determined; custom pattern land
|
||||
in_param_pattern = true;
|
||||
param_pattern.clear();
|
||||
} else if in_param_pattern {
|
||||
// Ignore leading whitespace for pattern
|
||||
if !(ch == ' ' && param_pattern.is_empty()) {
|
||||
param_pattern.push(ch);
|
||||
}
|
||||
} else {
|
||||
param_name.push(ch);
|
||||
let mut params_nesting = 0usize;
|
||||
let close_idx = pattern
|
||||
.find(|c| match c {
|
||||
'{' => {
|
||||
params_nesting += 1;
|
||||
false
|
||||
}
|
||||
} else if ch == '{' {
|
||||
in_param = true;
|
||||
is_dynamic = true;
|
||||
elems.push(PatternElement::Str(el.clone()));
|
||||
el.clear();
|
||||
} else {
|
||||
re1.push_str(escape(&ch.to_string()).as_str());
|
||||
re2.push(ch);
|
||||
el.push(ch);
|
||||
len += 1;
|
||||
'}' => {
|
||||
params_nesting -= 1;
|
||||
params_nesting == 0
|
||||
}
|
||||
_ => false,
|
||||
}).expect("malformed param");
|
||||
let (mut param, rem) = pattern.split_at(close_idx + 1);
|
||||
param = ¶m[1..param.len() - 1]; // Remove outer brackets
|
||||
let (name, pattern) = match param.find(':') {
|
||||
Some(idx) => {
|
||||
let (name, pattern) = param.split_at(idx);
|
||||
(name, &pattern[1..])
|
||||
}
|
||||
}
|
||||
|
||||
if !el.is_empty() {
|
||||
elems.push(PatternElement::Str(el.clone()));
|
||||
}
|
||||
|
||||
let re = if is_dynamic {
|
||||
if !for_prefix {
|
||||
re1.push('$');
|
||||
}
|
||||
re1
|
||||
} else {
|
||||
re2
|
||||
None => (param, DEFAULT_PATTERN),
|
||||
};
|
||||
(re, elems, is_dynamic, len)
|
||||
(
|
||||
PatternElement::Var(name.to_string()),
|
||||
format!(r"(?P<{}>{})", &name, &pattern),
|
||||
rem,
|
||||
)
|
||||
}
|
||||
|
||||
fn parse(
|
||||
mut pattern: &str, for_prefix: bool,
|
||||
) -> (String, Vec<PatternElement>, bool, usize) {
|
||||
if pattern.find('{').is_none() {
|
||||
return (
|
||||
String::from(pattern),
|
||||
vec![PatternElement::Str(String::from(pattern))],
|
||||
false,
|
||||
pattern.chars().count(),
|
||||
);
|
||||
};
|
||||
|
||||
let mut elems = Vec::new();
|
||||
let mut re = String::from("^");
|
||||
|
||||
while let Some(idx) = pattern.find('{') {
|
||||
let (prefix, rem) = pattern.split_at(idx);
|
||||
elems.push(PatternElement::Str(String::from(prefix)));
|
||||
re.push_str(&escape(prefix));
|
||||
let (param_pattern, re_part, rem) = Self::parse_param(rem);
|
||||
elems.push(param_pattern);
|
||||
re.push_str(&re_part);
|
||||
pattern = rem;
|
||||
}
|
||||
|
||||
elems.push(PatternElement::Str(String::from(pattern)));
|
||||
re.push_str(&escape(pattern));
|
||||
|
||||
if !for_prefix {
|
||||
re.push_str("$");
|
||||
}
|
||||
|
||||
(re, elems, true, pattern.chars().count())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1072,6 +1069,16 @@ mod tests {
|
||||
let info = re.match_with_params(&req, 0).unwrap();
|
||||
assert_eq!(info.get("version").unwrap(), "151");
|
||||
assert_eq!(info.get("id").unwrap(), "adahg32");
|
||||
|
||||
let re = ResourceDef::new("/{id:[[:digit:]]{6}}");
|
||||
assert!(re.is_match("/012345"));
|
||||
assert!(!re.is_match("/012"));
|
||||
assert!(!re.is_match("/01234567"));
|
||||
assert!(!re.is_match("/XXXXXX"));
|
||||
|
||||
let req = TestRequest::with_uri("/012345").finish();
|
||||
let info = re.match_with_params(&req, 0).unwrap();
|
||||
assert_eq!(info.get("id").unwrap(), "012345");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
132
src/scope.rs
132
src/scope.rs
@@ -5,7 +5,10 @@ use std::rc::Rc;
|
||||
use futures::{Async, Future, Poll};
|
||||
|
||||
use error::Error;
|
||||
use handler::{AsyncResult, AsyncResultItem, FromRequest, Responder, RouteHandler};
|
||||
use handler::{
|
||||
AsyncResult, AsyncResultItem, FromRequest, Handler, Responder, RouteHandler,
|
||||
WrapHandler,
|
||||
};
|
||||
use http::Method;
|
||||
use httprequest::HttpRequest;
|
||||
use httpresponse::HttpResponse;
|
||||
@@ -180,7 +183,7 @@ impl<S: 'static> Scope<S> {
|
||||
where
|
||||
F: FnOnce(Scope<S>) -> Scope<S>,
|
||||
{
|
||||
let rdef = ResourceDef::prefix(&path);
|
||||
let rdef = ResourceDef::prefix(&insert_slash(path));
|
||||
let scope = Scope {
|
||||
rdef: rdef.clone(),
|
||||
filters: Vec::new(),
|
||||
@@ -227,9 +230,11 @@ impl<S: 'static> Scope<S> {
|
||||
R: Responder + 'static,
|
||||
T: FromRequest<S> + 'static,
|
||||
{
|
||||
Rc::get_mut(&mut self.router)
|
||||
.unwrap()
|
||||
.register_route(path, method, f);
|
||||
Rc::get_mut(&mut self.router).unwrap().register_route(
|
||||
&insert_slash(path),
|
||||
method,
|
||||
f,
|
||||
);
|
||||
self
|
||||
}
|
||||
|
||||
@@ -261,7 +266,7 @@ impl<S: 'static> Scope<S> {
|
||||
F: FnOnce(&mut Resource<S>) -> R + 'static,
|
||||
{
|
||||
// add resource
|
||||
let mut resource = Resource::new(ResourceDef::new(path));
|
||||
let mut resource = Resource::new(ResourceDef::new(&insert_slash(path)));
|
||||
f(&mut resource);
|
||||
|
||||
Rc::get_mut(&mut self.router)
|
||||
@@ -286,6 +291,35 @@ impl<S: 'static> Scope<S> {
|
||||
self
|
||||
}
|
||||
|
||||
/// Configure handler for specific path prefix.
|
||||
///
|
||||
/// A path prefix consists of valid path segments, i.e for the
|
||||
/// prefix `/app` any request with the paths `/app`, `/app/` or
|
||||
/// `/app/test` would match, but the path `/application` would
|
||||
/// not.
|
||||
///
|
||||
/// ```rust
|
||||
/// # extern crate actix_web;
|
||||
/// use actix_web::{http, App, HttpRequest, HttpResponse};
|
||||
///
|
||||
/// fn main() {
|
||||
/// let app = App::new().scope("/scope-prefix", |scope| {
|
||||
/// scope.handler("/app", |req: &HttpRequest| match *req.method() {
|
||||
/// http::Method::GET => HttpResponse::Ok(),
|
||||
/// http::Method::POST => HttpResponse::MethodNotAllowed(),
|
||||
/// _ => HttpResponse::NotFound(),
|
||||
/// })
|
||||
/// });
|
||||
/// }
|
||||
/// ```
|
||||
pub fn handler<H: Handler<S>>(mut self, path: &str, handler: H) -> Scope<S> {
|
||||
let path = insert_slash(path.trim().trim_right_matches('/'));
|
||||
Rc::get_mut(&mut self.router)
|
||||
.expect("Multiple copies of scope router")
|
||||
.register_handler(&path, Box::new(WrapHandler::new(handler)), None);
|
||||
self
|
||||
}
|
||||
|
||||
/// Register a scope middleware
|
||||
///
|
||||
/// This is similar to `App's` middlewares, but
|
||||
@@ -301,6 +335,14 @@ impl<S: 'static> Scope<S> {
|
||||
}
|
||||
}
|
||||
|
||||
fn insert_slash(path: &str) -> String {
|
||||
let mut path = path.to_owned();
|
||||
if !path.is_empty() && !path.starts_with('/') {
|
||||
path.insert(0, '/');
|
||||
};
|
||||
path
|
||||
}
|
||||
|
||||
impl<S: 'static> RouteHandler<S> for Scope<S> {
|
||||
fn handle(&self, req: &HttpRequest<S>) -> AsyncResult<HttpResponse> {
|
||||
let tail = req.match_info().tail as usize;
|
||||
@@ -779,11 +821,37 @@ mod tests {
|
||||
scope
|
||||
.route("/path1", Method::GET, |_: HttpRequest<_>| {
|
||||
HttpResponse::Ok()
|
||||
}).route(
|
||||
"/path1",
|
||||
Method::DELETE,
|
||||
|_: HttpRequest<_>| HttpResponse::Ok(),
|
||||
)
|
||||
}).route("/path1", Method::DELETE, |_: HttpRequest<_>| {
|
||||
HttpResponse::Ok()
|
||||
})
|
||||
}).finish();
|
||||
|
||||
let req = TestRequest::with_uri("/app/path1").request();
|
||||
let resp = app.run(req);
|
||||
assert_eq!(resp.as_msg().status(), StatusCode::OK);
|
||||
|
||||
let req = TestRequest::with_uri("/app/path1")
|
||||
.method(Method::DELETE)
|
||||
.request();
|
||||
let resp = app.run(req);
|
||||
assert_eq!(resp.as_msg().status(), StatusCode::OK);
|
||||
|
||||
let req = TestRequest::with_uri("/app/path1")
|
||||
.method(Method::POST)
|
||||
.request();
|
||||
let resp = app.run(req);
|
||||
assert_eq!(resp.as_msg().status(), StatusCode::NOT_FOUND);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_scope_route_without_leading_slash() {
|
||||
let app = App::new()
|
||||
.scope("app", |scope| {
|
||||
scope
|
||||
.route("path1", Method::GET, |_: HttpRequest<_>| HttpResponse::Ok())
|
||||
.route("path1", Method::DELETE, |_: HttpRequest<_>| {
|
||||
HttpResponse::Ok()
|
||||
})
|
||||
}).finish();
|
||||
|
||||
let req = TestRequest::with_uri("/app/path1").request();
|
||||
@@ -972,6 +1040,20 @@ mod tests {
|
||||
assert_eq!(resp.as_msg().status(), StatusCode::CREATED);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_nested_scope_no_slash() {
|
||||
let app = App::new()
|
||||
.scope("/app", |scope| {
|
||||
scope.nested("t1", |scope| {
|
||||
scope.resource("/path1", |r| r.f(|_| HttpResponse::Created()))
|
||||
})
|
||||
}).finish();
|
||||
|
||||
let req = TestRequest::with_uri("/app/t1/path1").request();
|
||||
let resp = app.run(req);
|
||||
assert_eq!(resp.as_msg().status(), StatusCode::CREATED);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_nested_scope_root() {
|
||||
let app = App::new()
|
||||
@@ -1120,4 +1202,32 @@ mod tests {
|
||||
let resp = app.run(req);
|
||||
assert_eq!(resp.as_msg().status(), StatusCode::METHOD_NOT_ALLOWED);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_handler() {
|
||||
let app = App::new()
|
||||
.scope("/scope", |scope| {
|
||||
scope.handler("/test", |_: &_| HttpResponse::Ok())
|
||||
}).finish();
|
||||
|
||||
let req = TestRequest::with_uri("/scope/test").request();
|
||||
let resp = app.run(req);
|
||||
assert_eq!(resp.as_msg().status(), StatusCode::OK);
|
||||
|
||||
let req = TestRequest::with_uri("/scope/test/").request();
|
||||
let resp = app.run(req);
|
||||
assert_eq!(resp.as_msg().status(), StatusCode::OK);
|
||||
|
||||
let req = TestRequest::with_uri("/scope/test/app").request();
|
||||
let resp = app.run(req);
|
||||
assert_eq!(resp.as_msg().status(), StatusCode::OK);
|
||||
|
||||
let req = TestRequest::with_uri("/scope/testapp").request();
|
||||
let resp = app.run(req);
|
||||
assert_eq!(resp.as_msg().status(), StatusCode::NOT_FOUND);
|
||||
|
||||
let req = TestRequest::with_uri("/scope/blah").request();
|
||||
let resp = app.run(req);
|
||||
assert_eq!(resp.as_msg().status(), StatusCode::NOT_FOUND);
|
||||
}
|
||||
}
|
||||
|
@@ -451,10 +451,13 @@ impl Accept {
|
||||
Delay::new(
|
||||
Instant::now() + Duration::from_millis(510),
|
||||
).map_err(|_| ())
|
||||
.and_then(move |_| {
|
||||
let _ = r.set_readiness(mio::Ready::readable());
|
||||
Ok(())
|
||||
}),
|
||||
.and_then(
|
||||
move |_| {
|
||||
let _ =
|
||||
r.set_readiness(mio::Ready::readable());
|
||||
Ok(())
|
||||
},
|
||||
),
|
||||
);
|
||||
Ok(())
|
||||
},
|
||||
|
@@ -5,6 +5,7 @@ use std::{io, ptr, time};
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
use futures::{Async, Future, Poll};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_timer::Delay;
|
||||
|
||||
use super::settings::WorkerSettings;
|
||||
use super::{h1, h2, ConnectionTag, HttpHandler, IoStream};
|
||||
@@ -30,6 +31,7 @@ where
|
||||
{
|
||||
proto: Option<HttpProtocol<T, H>>,
|
||||
node: Option<Node<HttpChannel<T, H>>>,
|
||||
ka_timeout: Option<Delay>,
|
||||
_tag: ConnectionTag,
|
||||
}
|
||||
|
||||
@@ -42,9 +44,11 @@ where
|
||||
settings: Rc<WorkerSettings<H>>, io: T, peer: Option<SocketAddr>,
|
||||
) -> HttpChannel<T, H> {
|
||||
let _tag = settings.connection();
|
||||
let ka_timeout = settings.keep_alive_timer();
|
||||
|
||||
HttpChannel {
|
||||
_tag,
|
||||
ka_timeout,
|
||||
node: None,
|
||||
proto: Some(HttpProtocol::Unknown(
|
||||
settings,
|
||||
@@ -55,7 +59,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn shutdown(&mut self) {
|
||||
pub(crate) fn shutdown(&mut self) {
|
||||
match self.proto {
|
||||
Some(HttpProtocol::H1(ref mut h1)) => {
|
||||
let io = h1.io();
|
||||
@@ -68,6 +72,18 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, H> Drop for HttpChannel<T, H>
|
||||
where
|
||||
T: IoStream,
|
||||
H: HttpHandler + 'static,
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
if let Some(mut node) = self.node.take() {
|
||||
node.remove()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, H> Future for HttpChannel<T, H>
|
||||
where
|
||||
T: IoStream,
|
||||
@@ -77,7 +93,19 @@ where
|
||||
type Error = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
if self.node.is_some() {
|
||||
// keep-alive timer
|
||||
if let Some(ref mut timer) = self.ka_timeout {
|
||||
match timer.poll() {
|
||||
Ok(Async::Ready(_)) => {
|
||||
trace!("Slow request timed out, close connection");
|
||||
return Ok(Async::Ready(()));
|
||||
}
|
||||
Ok(Async::NotReady) => (),
|
||||
Err(_) => panic!("Something is really wrong"),
|
||||
}
|
||||
}
|
||||
|
||||
if self.node.is_none() {
|
||||
let el = self as *mut _;
|
||||
self.node = Some(Node::new(el));
|
||||
let _ = match self.proto {
|
||||
@@ -94,41 +122,32 @@ where
|
||||
};
|
||||
}
|
||||
|
||||
let mut is_eof = false;
|
||||
let kind = match self.proto {
|
||||
Some(HttpProtocol::H1(ref mut h1)) => {
|
||||
let result = h1.poll();
|
||||
match result {
|
||||
Ok(Async::Ready(())) | Err(_) => {
|
||||
if let Some(n) = self.node.as_mut() {
|
||||
n.remove()
|
||||
};
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
return result;
|
||||
return h1.poll();
|
||||
}
|
||||
Some(HttpProtocol::H2(ref mut h2)) => {
|
||||
let result = h2.poll();
|
||||
match result {
|
||||
Ok(Async::Ready(())) | Err(_) => {
|
||||
if let Some(n) = self.node.as_mut() {
|
||||
n.remove()
|
||||
};
|
||||
return h2.poll();
|
||||
}
|
||||
Some(HttpProtocol::Unknown(_, _, ref mut io, ref mut buf)) => {
|
||||
let mut disconnect = false;
|
||||
match io.read_available(buf) {
|
||||
Ok(Async::Ready((read_some, stream_closed))) => {
|
||||
is_eof = stream_closed;
|
||||
// Only disconnect if no data was read.
|
||||
if is_eof && !read_some {
|
||||
disconnect = true;
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
disconnect = true;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
return result;
|
||||
}
|
||||
Some(HttpProtocol::Unknown(_, _, ref mut io, ref mut buf)) => {
|
||||
match io.read_available(buf) {
|
||||
Ok(Async::Ready(true)) | Err(_) => {
|
||||
debug!("Ignored premature client disconnection");
|
||||
if let Some(n) = self.node.as_mut() {
|
||||
n.remove()
|
||||
};
|
||||
return Err(());
|
||||
}
|
||||
_ => (),
|
||||
if disconnect {
|
||||
debug!("Ignored premature client disconnection");
|
||||
return Err(());
|
||||
}
|
||||
|
||||
if buf.len() >= 14 {
|
||||
@@ -148,8 +167,14 @@ where
|
||||
if let Some(HttpProtocol::Unknown(settings, addr, io, buf)) = self.proto.take() {
|
||||
match kind {
|
||||
ProtocolKind::Http1 => {
|
||||
self.proto =
|
||||
Some(HttpProtocol::H1(h1::Http1::new(settings, io, addr, buf)));
|
||||
self.proto = Some(HttpProtocol::H1(h1::Http1::new(
|
||||
settings,
|
||||
io,
|
||||
addr,
|
||||
buf,
|
||||
is_eof,
|
||||
self.ka_timeout.take(),
|
||||
)));
|
||||
return self.poll();
|
||||
}
|
||||
ProtocolKind::Http2 => {
|
||||
@@ -158,6 +183,7 @@ where
|
||||
io,
|
||||
addr,
|
||||
buf.freeze(),
|
||||
self.ka_timeout.take(),
|
||||
)));
|
||||
return self.poll();
|
||||
}
|
||||
@@ -182,13 +208,14 @@ impl<T> Node<T> {
|
||||
}
|
||||
}
|
||||
|
||||
fn insert<I>(&mut self, next: &mut Node<I>) {
|
||||
fn insert<I>(&mut self, next_el: &mut Node<I>) {
|
||||
unsafe {
|
||||
let next: *mut Node<T> = next as *const _ as *mut _;
|
||||
let next: *mut Node<T> = next_el as *const _ as *mut _;
|
||||
|
||||
if let Some(ref mut next2) = self.next {
|
||||
if let Some(next2) = self.next {
|
||||
let n = next2.as_mut().unwrap();
|
||||
n.prev = Some(next);
|
||||
next_el.next = Some(next2 as *mut _);
|
||||
}
|
||||
self.next = Some(next);
|
||||
|
||||
@@ -201,11 +228,14 @@ impl<T> Node<T> {
|
||||
unsafe {
|
||||
self.element = ptr::null_mut();
|
||||
let next = self.next.take();
|
||||
let mut prev = self.prev.take();
|
||||
let prev = self.prev.take();
|
||||
|
||||
if let Some(ref mut prev) = prev {
|
||||
if let Some(prev) = prev {
|
||||
prev.as_mut().unwrap().next = next;
|
||||
}
|
||||
if let Some(next) = next {
|
||||
next.as_mut().unwrap().prev = prev;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -219,7 +249,7 @@ impl Node<()> {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn traverse<T, H>(&self)
|
||||
pub(crate) fn traverse<T, H, F: Fn(&mut HttpChannel<T, H>)>(&self, f: F)
|
||||
where
|
||||
T: IoStream,
|
||||
H: HttpHandler + 'static,
|
||||
@@ -234,7 +264,7 @@ impl Node<()> {
|
||||
if !n.element.is_null() {
|
||||
let ch: &mut HttpChannel<T, H> =
|
||||
&mut *(&mut *(n.element as *mut _) as *mut () as *mut _);
|
||||
ch.shutdown();
|
||||
f(ch);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@@ -21,7 +21,12 @@ impl HttpHandlerTask for ServerError {
|
||||
bytes.reserve(helpers::STATUS_LINE_BUF_SIZE + 1);
|
||||
helpers::write_status_line(self.0, self.1.as_u16(), bytes);
|
||||
}
|
||||
// Convert Status Code to Reason.
|
||||
let reason = self.1.canonical_reason().unwrap_or("");
|
||||
io.buffer().extend_from_slice(reason.as_bytes());
|
||||
// No response body.
|
||||
io.buffer().extend_from_slice(b"\r\ncontent-length: 0\r\n");
|
||||
// date header
|
||||
io.set_date();
|
||||
Ok(Async::Ready(true))
|
||||
}
|
||||
|
142
src/server/h1.rs
142
src/server/h1.rs
@@ -22,13 +22,14 @@ use super::{HttpHandler, HttpHandlerTask, IoStream};
|
||||
const MAX_PIPELINED_MESSAGES: usize = 16;
|
||||
|
||||
bitflags! {
|
||||
struct Flags: u8 {
|
||||
const STARTED = 0b0000_0001;
|
||||
const ERROR = 0b0000_0010;
|
||||
const KEEPALIVE = 0b0000_0100;
|
||||
const SHUTDOWN = 0b0000_1000;
|
||||
const DISCONNECTED = 0b0001_0000;
|
||||
const POLLED = 0b0010_0000;
|
||||
pub struct Flags: u8 {
|
||||
const STARTED = 0b0000_0001;
|
||||
const ERROR = 0b0000_0010;
|
||||
const KEEPALIVE = 0b0000_0100;
|
||||
const SHUTDOWN = 0b0000_1000;
|
||||
const READ_DISCONNECTED = 0b0001_0000;
|
||||
const WRITE_DISCONNECTED = 0b0010_0000;
|
||||
const POLLED = 0b0100_0000;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -90,18 +91,22 @@ where
|
||||
{
|
||||
pub fn new(
|
||||
settings: Rc<WorkerSettings<H>>, stream: T, addr: Option<SocketAddr>,
|
||||
buf: BytesMut,
|
||||
buf: BytesMut, is_eof: bool, keepalive_timer: Option<Delay>,
|
||||
) -> Self {
|
||||
Http1 {
|
||||
flags: Flags::KEEPALIVE,
|
||||
flags: if is_eof {
|
||||
Flags::READ_DISCONNECTED
|
||||
} else {
|
||||
Flags::KEEPALIVE
|
||||
},
|
||||
stream: H1Writer::new(stream, Rc::clone(&settings)),
|
||||
decoder: H1Decoder::new(),
|
||||
payload: None,
|
||||
tasks: VecDeque::new(),
|
||||
keepalive_timer: None,
|
||||
addr,
|
||||
buf,
|
||||
settings,
|
||||
keepalive_timer,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -117,6 +122,13 @@ where
|
||||
|
||||
#[inline]
|
||||
fn can_read(&self) -> bool {
|
||||
if self
|
||||
.flags
|
||||
.intersects(Flags::ERROR | Flags::READ_DISCONNECTED)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if let Some(ref info) = self.payload {
|
||||
info.need_read() == PayloadStatus::Read
|
||||
} else {
|
||||
@@ -125,6 +137,8 @@ where
|
||||
}
|
||||
|
||||
fn notify_disconnect(&mut self) {
|
||||
self.flags.insert(Flags::WRITE_DISCONNECTED);
|
||||
|
||||
// notify all tasks
|
||||
self.stream.disconnected();
|
||||
for task in &mut self.tasks {
|
||||
@@ -132,6 +146,21 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn client_disconnect(&mut self) {
|
||||
// notify all tasks
|
||||
self.notify_disconnect();
|
||||
// kill keepalive
|
||||
self.keepalive_timer.take();
|
||||
|
||||
// on parse error, stop reading stream but tasks need to be
|
||||
// completed
|
||||
self.flags.insert(Flags::ERROR);
|
||||
|
||||
if let Some(mut payload) = self.payload.take() {
|
||||
payload.set_error(PayloadError::Incomplete);
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn poll(&mut self) -> Poll<(), ()> {
|
||||
// keep-alive timer
|
||||
@@ -148,6 +177,11 @@ where
|
||||
|
||||
// shutdown
|
||||
if self.flags.contains(Flags::SHUTDOWN) {
|
||||
if self.flags.intersects(
|
||||
Flags::ERROR | Flags::READ_DISCONNECTED | Flags::WRITE_DISCONNECTED,
|
||||
) {
|
||||
return Ok(Async::Ready(()));
|
||||
}
|
||||
match self.stream.poll_completed(true) {
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Ok(Async::Ready(_)) => return Ok(Async::Ready(())),
|
||||
@@ -182,44 +216,25 @@ where
|
||||
self.flags.insert(Flags::POLLED);
|
||||
return;
|
||||
}
|
||||
|
||||
// read io from socket
|
||||
if !self.flags.intersects(Flags::ERROR)
|
||||
&& self.tasks.len() < MAX_PIPELINED_MESSAGES
|
||||
&& self.can_read()
|
||||
{
|
||||
if self.can_read() && self.tasks.len() < MAX_PIPELINED_MESSAGES {
|
||||
match self.stream.get_mut().read_available(&mut self.buf) {
|
||||
Ok(Async::Ready(disconnected)) => {
|
||||
if disconnected {
|
||||
// notify all tasks
|
||||
self.notify_disconnect();
|
||||
// kill keepalive
|
||||
self.keepalive_timer.take();
|
||||
|
||||
// on parse error, stop reading stream but tasks need to be
|
||||
// completed
|
||||
self.flags.insert(Flags::ERROR);
|
||||
|
||||
if let Some(mut payload) = self.payload.take() {
|
||||
payload.set_error(PayloadError::Incomplete);
|
||||
}
|
||||
} else {
|
||||
Ok(Async::Ready((read_some, disconnected))) => {
|
||||
if read_some {
|
||||
self.parse();
|
||||
}
|
||||
if disconnected {
|
||||
// delay disconnect until all tasks have finished.
|
||||
self.flags.insert(Flags::READ_DISCONNECTED);
|
||||
if self.tasks.is_empty() {
|
||||
self.client_disconnect();
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Async::NotReady) => (),
|
||||
Err(_) => {
|
||||
// notify all tasks
|
||||
self.notify_disconnect();
|
||||
// kill keepalive
|
||||
self.keepalive_timer.take();
|
||||
|
||||
// on parse error, stop reading stream but tasks need to be
|
||||
// completed
|
||||
self.flags.insert(Flags::ERROR);
|
||||
|
||||
if let Some(mut payload) = self.payload.take() {
|
||||
payload.set_error(PayloadError::Incomplete);
|
||||
}
|
||||
self.client_disconnect();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -233,7 +248,10 @@ where
|
||||
let mut idx = 0;
|
||||
while idx < self.tasks.len() {
|
||||
// only one task can do io operation in http/1
|
||||
if !io && !self.tasks[idx].flags.contains(EntryFlags::EOF) {
|
||||
if !io
|
||||
&& !self.tasks[idx].flags.contains(EntryFlags::EOF)
|
||||
&& !self.flags.contains(Flags::WRITE_DISCONNECTED)
|
||||
{
|
||||
// io is corrupted, send buffer
|
||||
if self.tasks[idx].flags.contains(EntryFlags::ERROR) {
|
||||
if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
|
||||
@@ -297,7 +315,6 @@ where
|
||||
}
|
||||
|
||||
// cleanup finished tasks
|
||||
let max = self.tasks.len() >= MAX_PIPELINED_MESSAGES;
|
||||
while !self.tasks.is_empty() {
|
||||
if self.tasks[0]
|
||||
.flags
|
||||
@@ -308,10 +325,6 @@ where
|
||||
break;
|
||||
}
|
||||
}
|
||||
// read more message
|
||||
if max && self.tasks.len() >= MAX_PIPELINED_MESSAGES {
|
||||
return Ok(Async::Ready(true));
|
||||
}
|
||||
|
||||
// check stream state
|
||||
if self.flags.contains(Flags::STARTED) {
|
||||
@@ -331,8 +344,12 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// deal with keep-alive
|
||||
// deal with keep-alive and steam eof (client-side write shutdown)
|
||||
if self.tasks.is_empty() {
|
||||
// handle stream eof
|
||||
if self.flags.contains(Flags::READ_DISCONNECTED) {
|
||||
return Ok(Async::Ready(false));
|
||||
}
|
||||
// no keep-alive
|
||||
if self.flags.contains(Flags::ERROR)
|
||||
|| (!self.flags.contains(Flags::KEEPALIVE)
|
||||
@@ -347,7 +364,7 @@ where
|
||||
if self.keepalive_timer.is_none() && keep_alive > 0 {
|
||||
trace!("Start keep-alive timer");
|
||||
let mut timer =
|
||||
Delay::new(Instant::now() + Duration::new(keep_alive, 0));
|
||||
Delay::new(Instant::now() + Duration::from_secs(keep_alive));
|
||||
// register timer
|
||||
let _ = timer.poll();
|
||||
self.keepalive_timer = Some(timer);
|
||||
@@ -448,7 +465,14 @@ where
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(None) => break,
|
||||
Ok(None) => {
|
||||
if self.flags.contains(Flags::READ_DISCONNECTED)
|
||||
&& self.tasks.is_empty()
|
||||
{
|
||||
self.client_disconnect();
|
||||
}
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
self.flags.insert(Flags::ERROR);
|
||||
if let Some(mut payload) = self.payload.take() {
|
||||
@@ -603,24 +627,36 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_req_parse() {
|
||||
fn test_req_parse1() {
|
||||
let buf = Buffer::new("GET /test HTTP/1.1\r\n\r\n");
|
||||
let readbuf = BytesMut::new();
|
||||
let settings = Rc::new(wrk_settings());
|
||||
|
||||
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf);
|
||||
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, false, None);
|
||||
h1.poll_io();
|
||||
h1.poll_io();
|
||||
assert_eq!(h1.tasks.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_req_parse2() {
|
||||
let buf = Buffer::new("");
|
||||
let readbuf =
|
||||
BytesMut::from(Vec::<u8>::from(&b"GET /test HTTP/1.1\r\n\r\n"[..]));
|
||||
let settings = Rc::new(wrk_settings());
|
||||
|
||||
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, true, None);
|
||||
h1.poll_io();
|
||||
assert_eq!(h1.tasks.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_req_parse_err() {
|
||||
let buf = Buffer::new("GET /test HTTP/1\r\n\r\n");
|
||||
let readbuf = BytesMut::new();
|
||||
let settings = Rc::new(wrk_settings());
|
||||
|
||||
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf);
|
||||
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, false, None);
|
||||
h1.poll_io();
|
||||
h1.poll_io();
|
||||
assert!(h1.flags.contains(Flags::ERROR));
|
||||
|
@@ -63,7 +63,9 @@ impl<T: AsyncWrite, H: 'static> H1Writer<T, H> {
|
||||
self.flags = Flags::KEEPALIVE;
|
||||
}
|
||||
|
||||
pub fn disconnected(&mut self) {}
|
||||
pub fn disconnected(&mut self) {
|
||||
self.flags.insert(Flags::DISCONNECTED);
|
||||
}
|
||||
|
||||
pub fn keepalive(&self) -> bool {
|
||||
self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE)
|
||||
@@ -268,10 +270,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
|
||||
let pl: &[u8] = payload.as_ref();
|
||||
let n = match Self::write_data(&mut self.stream, pl) {
|
||||
Err(err) => {
|
||||
if err.kind() == io::ErrorKind::WriteZero {
|
||||
self.disconnected();
|
||||
}
|
||||
|
||||
self.disconnected();
|
||||
return Err(err);
|
||||
}
|
||||
Ok(val) => val,
|
||||
@@ -315,14 +314,15 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
|
||||
|
||||
#[inline]
|
||||
fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error> {
|
||||
if self.flags.contains(Flags::DISCONNECTED) {
|
||||
return Err(io::Error::new(io::ErrorKind::Other, "disconnected"));
|
||||
}
|
||||
|
||||
if !self.buffer.is_empty() {
|
||||
let written = {
|
||||
match Self::write_data(&mut self.stream, self.buffer.as_ref().as_ref()) {
|
||||
Err(err) => {
|
||||
if err.kind() == io::ErrorKind::WriteZero {
|
||||
self.disconnected();
|
||||
}
|
||||
|
||||
self.disconnected();
|
||||
return Err(err);
|
||||
}
|
||||
Ok(val) => val,
|
||||
@@ -339,7 +339,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
|
||||
self.stream.poll_flush()?;
|
||||
self.stream.shutdown()
|
||||
} else {
|
||||
self.stream.poll_flush()
|
||||
Ok(self.stream.poll_flush()?)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -59,6 +59,7 @@ where
|
||||
{
|
||||
pub fn new(
|
||||
settings: Rc<WorkerSettings<H>>, io: T, addr: Option<SocketAddr>, buf: Bytes,
|
||||
keepalive_timer: Option<Delay>,
|
||||
) -> Self {
|
||||
let extensions = io.extensions();
|
||||
Http2 {
|
||||
@@ -68,10 +69,10 @@ where
|
||||
unread: if buf.is_empty() { None } else { Some(buf) },
|
||||
inner: io,
|
||||
})),
|
||||
keepalive_timer: None,
|
||||
addr,
|
||||
settings,
|
||||
extensions,
|
||||
keepalive_timer,
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -8,7 +8,6 @@ use actix::{Actor, Addr, Arbiter, AsyncContext, Context, Handler, System};
|
||||
use futures::{Future, Stream};
|
||||
use net2::{TcpBuilder, TcpStreamExt};
|
||||
use num_cpus;
|
||||
use tokio::executor::current_thread;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_tcp::TcpStream;
|
||||
|
||||
@@ -71,9 +70,9 @@ where
|
||||
factory: Arc::new(f),
|
||||
host: None,
|
||||
backlog: 2048,
|
||||
keep_alive: KeepAlive::Os,
|
||||
keep_alive: KeepAlive::Timeout(5),
|
||||
shutdown_timeout: 30,
|
||||
exit: true,
|
||||
exit: false,
|
||||
no_http2: false,
|
||||
no_signals: false,
|
||||
maxconn: 102_400,
|
||||
@@ -132,7 +131,7 @@ where
|
||||
|
||||
/// Set server keep-alive setting.
|
||||
///
|
||||
/// By default keep alive is set to a `Os`.
|
||||
/// By default keep alive is set to a 5 seconds.
|
||||
pub fn keep_alive<T: Into<KeepAlive>>(mut self, val: T) -> Self {
|
||||
self.keep_alive = val.into();
|
||||
self
|
||||
@@ -637,7 +636,9 @@ where
|
||||
|
||||
fn shutdown(&self, force: bool) {
|
||||
if force {
|
||||
self.settings.head().traverse::<TcpStream, H>();
|
||||
self.settings
|
||||
.head()
|
||||
.traverse(|ch: &mut HttpChannel<TcpStream, H>| ch.shutdown());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -693,7 +694,7 @@ where
|
||||
};
|
||||
let _ = io.set_nodelay(true);
|
||||
|
||||
current_thread::spawn(HttpChannel::new(h, io, peer));
|
||||
Arbiter::spawn(HttpChannel::new(h, io, peer));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -753,10 +754,10 @@ where
|
||||
let _ = io.set_nodelay(true);
|
||||
|
||||
let rate = h.connection_rate();
|
||||
current_thread::spawn(self.acceptor.accept(io).then(move |res| {
|
||||
Arbiter::spawn(self.acceptor.accept(io).then(move |res| {
|
||||
drop(rate);
|
||||
match res {
|
||||
Ok(io) => current_thread::spawn(HttpChannel::new(h, io, peer)),
|
||||
Ok(io) => Arbiter::spawn(HttpChannel::new(h, io, peer)),
|
||||
Err(err) => trace!("Can not establish connection: {}", err),
|
||||
}
|
||||
Ok(())
|
||||
|
@@ -390,7 +390,7 @@ pub trait IoStream: AsyncRead + AsyncWrite + 'static {
|
||||
|
||||
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()>;
|
||||
|
||||
fn read_available(&mut self, buf: &mut BytesMut) -> Poll<bool, io::Error> {
|
||||
fn read_available(&mut self, buf: &mut BytesMut) -> Poll<(bool, bool), io::Error> {
|
||||
let mut read_some = false;
|
||||
loop {
|
||||
if buf.remaining_mut() < LW_BUFFER_SIZE {
|
||||
@@ -400,7 +400,7 @@ pub trait IoStream: AsyncRead + AsyncWrite + 'static {
|
||||
match self.read(buf.bytes_mut()) {
|
||||
Ok(n) => {
|
||||
if n == 0 {
|
||||
return Ok(Async::Ready(!read_some));
|
||||
return Ok(Async::Ready((read_some, true)));
|
||||
} else {
|
||||
read_some = true;
|
||||
buf.advance_mut(n);
|
||||
@@ -409,7 +409,7 @@ pub trait IoStream: AsyncRead + AsyncWrite + 'static {
|
||||
Err(e) => {
|
||||
return if e.kind() == io::ErrorKind::WouldBlock {
|
||||
if read_some {
|
||||
Ok(Async::Ready(false))
|
||||
Ok(Async::Ready((read_some, false)))
|
||||
} else {
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
|
@@ -217,7 +217,7 @@ impl Server {
|
||||
// start accept thread
|
||||
for sock in &self.sockets {
|
||||
for s in sock.iter() {
|
||||
info!("Starting server on http://{:?}", s.1.local_addr().ok());
|
||||
info!("Starting server on http://{}", s.1.local_addr().unwrap());
|
||||
}
|
||||
}
|
||||
let rx = self
|
||||
|
@@ -13,7 +13,7 @@ use http::StatusCode;
|
||||
use lazycell::LazyCell;
|
||||
use parking_lot::Mutex;
|
||||
use time;
|
||||
use tokio_timer::Interval;
|
||||
use tokio_timer::{Delay, Interval};
|
||||
|
||||
use super::channel::Node;
|
||||
use super::message::{Request, RequestPool};
|
||||
@@ -197,6 +197,16 @@ impl<H> WorkerSettings<H> {
|
||||
&self.h
|
||||
}
|
||||
|
||||
pub fn keep_alive_timer(&self) -> Option<Delay> {
|
||||
if self.keep_alive != 0 {
|
||||
Some(Delay::new(
|
||||
Instant::now() + Duration::from_secs(self.keep_alive),
|
||||
))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn keep_alive(&self) -> u64 {
|
||||
self.keep_alive
|
||||
}
|
||||
|
@@ -120,6 +120,7 @@ impl TestServer {
|
||||
HttpServer::new(factory)
|
||||
.disable_signals()
|
||||
.listen(tcp)
|
||||
.keep_alive(5)
|
||||
.start();
|
||||
|
||||
tx.send((System::current(), local_addr, TestServer::get_conn()))
|
||||
@@ -328,6 +329,7 @@ impl<S: 'static> TestServerBuilder<S> {
|
||||
config(&mut app);
|
||||
vec![app]
|
||||
}).workers(1)
|
||||
.keep_alive(5)
|
||||
.disable_signals();
|
||||
|
||||
tx.send((System::current(), addr, TestServer::get_conn()))
|
||||
|
@@ -8,7 +8,8 @@ extern crate rand;
|
||||
#[cfg(all(unix, feature = "uds"))]
|
||||
extern crate tokio_uds;
|
||||
|
||||
use std::io::Read;
|
||||
use std::io::{Read, Write};
|
||||
use std::{net, thread};
|
||||
|
||||
use bytes::Bytes;
|
||||
use flate2::read::GzDecoder;
|
||||
@@ -66,6 +67,16 @@ fn test_simple() {
|
||||
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_connection_close() {
|
||||
let mut srv =
|
||||
test::TestServer::new(|app| app.handler(|_| HttpResponse::Ok().body(STR)));
|
||||
|
||||
let request = srv.get().header("Connection", "close").finish().unwrap();
|
||||
let response = srv.execute(request.send()).unwrap();
|
||||
assert!(response.status().is_success());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_with_query_parameter() {
|
||||
let mut srv = test::TestServer::new(|app| {
|
||||
@@ -396,24 +407,29 @@ fn test_client_cookie_handling() {
|
||||
let cookie2 = cookie2b.clone();
|
||||
app.handler(move |req: &HttpRequest| {
|
||||
// Check cookies were sent correctly
|
||||
req.cookie("cookie1").ok_or_else(err)
|
||||
.and_then(|c1| if c1.value() == "value1" {
|
||||
req.cookie("cookie1")
|
||||
.ok_or_else(err)
|
||||
.and_then(|c1| {
|
||||
if c1.value() == "value1" {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(err())
|
||||
})
|
||||
.and_then(|()| req.cookie("cookie2").ok_or_else(err))
|
||||
.and_then(|c2| if c2.value() == "value2" {
|
||||
}
|
||||
}).and_then(|()| req.cookie("cookie2").ok_or_else(err))
|
||||
.and_then(|c2| {
|
||||
if c2.value() == "value2" {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(err())
|
||||
})
|
||||
// Send some cookies back
|
||||
.map(|_| HttpResponse::Ok()
|
||||
.cookie(cookie1.clone())
|
||||
.cookie(cookie2.clone())
|
||||
.finish()
|
||||
)
|
||||
}
|
||||
})
|
||||
// Send some cookies back
|
||||
.map(|_| {
|
||||
HttpResponse::Ok()
|
||||
.cookie(cookie1.clone())
|
||||
.cookie(cookie2.clone())
|
||||
.finish()
|
||||
})
|
||||
})
|
||||
});
|
||||
|
||||
@@ -460,3 +476,33 @@ fn test_default_headers() {
|
||||
"\""
|
||||
)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn client_read_until_eof() {
|
||||
let addr = test::TestServer::unused_addr();
|
||||
|
||||
thread::spawn(move || {
|
||||
let lst = net::TcpListener::bind(addr).unwrap();
|
||||
|
||||
for stream in lst.incoming() {
|
||||
let mut stream = stream.unwrap();
|
||||
let mut b = [0; 1000];
|
||||
let _ = stream.read(&mut b).unwrap();
|
||||
let _ = stream
|
||||
.write_all(b"HTTP/1.1 200 OK\r\nconnection: close\r\n\r\nwelcome!");
|
||||
}
|
||||
});
|
||||
|
||||
let mut sys = actix::System::new("test");
|
||||
|
||||
// client request
|
||||
let req = client::ClientRequest::get(format!("http://{}/", addr).as_str())
|
||||
.finish()
|
||||
.unwrap();
|
||||
let response = sys.block_on(req.send()).unwrap();
|
||||
assert!(response.status().is_success());
|
||||
|
||||
// read response
|
||||
let bytes = sys.block_on(response.body()).unwrap();
|
||||
assert_eq!(bytes, Bytes::from_static(b"welcome!"));
|
||||
}
|
||||
|
@@ -932,6 +932,28 @@ fn test_application() {
|
||||
assert!(response.status().is_success());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_default_404_handler_response() {
|
||||
let mut srv = test::TestServer::with_factory(|| {
|
||||
App::new()
|
||||
.prefix("/app")
|
||||
.resource("", |r| r.f(|_| HttpResponse::Ok()))
|
||||
.resource("/", |r| r.f(|_| HttpResponse::Ok()))
|
||||
});
|
||||
let addr = srv.addr();
|
||||
|
||||
let mut buf = [0; 24];
|
||||
let request = TcpStream::connect(&addr)
|
||||
.and_then(|sock| {
|
||||
tokio::io::write_all(sock, "HEAD / HTTP/1.1\r\nHost: localhost\r\n\r\n")
|
||||
.and_then(|(sock, _)| tokio::io::read_exact(sock, &mut buf))
|
||||
.and_then(|(_, buf)| Ok(buf))
|
||||
}).map_err(|e| panic!("{:?}", e));
|
||||
let response = srv.execute(request).unwrap();
|
||||
let rep = String::from_utf8_lossy(&response[..]);
|
||||
assert!(rep.contains("HTTP/1.1 404 Not Found"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_server_cookies() {
|
||||
use actix_web::http;
|
||||
|
Reference in New Issue
Block a user