mirror of
https://github.com/fafhrd91/actix-web
synced 2025-07-18 15:35:43 +02:00
Compare commits
29 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
4ca9fd2ad1 | ||
|
f0f67072ae | ||
|
24d1228943 | ||
|
b7a73e0a4f | ||
|
968c81e267 | ||
|
d5957a8466 | ||
|
f2f05e7715 | ||
|
3439f55288 | ||
|
0425e2776f | ||
|
6464f96f8b | ||
|
a2b170fec9 | ||
|
0b42cae082 | ||
|
c313c003a4 | ||
|
3fa23f5e10 | ||
|
2d51831899 | ||
|
e59abfd716 | ||
|
66881d7dd1 | ||
|
a42a8a2321 | ||
|
2341656173 | ||
|
487519acec | ||
|
af6caa92c8 | ||
|
3ccbce6bc8 | ||
|
797b52ecbf | ||
|
4bab50c861 | ||
|
5906971b6d | ||
|
8393d09a0f | ||
|
c3ae9997fc | ||
|
d39dcc58cd | ||
|
471a3e9806 |
21
CHANGES.md
21
CHANGES.md
@@ -1,5 +1,26 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [0.7.5] - 2018-09-04
|
||||||
|
|
||||||
|
### Added
|
||||||
|
|
||||||
|
* Added the ability to pass a custom `TlsConnector`.
|
||||||
|
|
||||||
|
* Allow to register handlers on scope level #465
|
||||||
|
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
* Handle socket read disconnect
|
||||||
|
|
||||||
|
* Handling scoped paths without leading slashes #460
|
||||||
|
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
|
||||||
|
* Read client response until eof if connection header set to close #464
|
||||||
|
|
||||||
|
|
||||||
## [0.7.4] - 2018-08-23
|
## [0.7.4] - 2018-08-23
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-web"
|
name = "actix-web"
|
||||||
version = "0.7.4"
|
version = "0.7.5"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Actix web is a simple, pragmatic and extremely fast web framework for Rust."
|
description = "Actix web is a simple, pragmatic and extremely fast web framework for Rust."
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
|
@@ -447,11 +447,8 @@ where
|
|||||||
{
|
{
|
||||||
let mut path = path.trim().trim_right_matches('/').to_owned();
|
let mut path = path.trim().trim_right_matches('/').to_owned();
|
||||||
if !path.is_empty() && !path.starts_with('/') {
|
if !path.is_empty() && !path.starts_with('/') {
|
||||||
path.insert(0, '/')
|
path.insert(0, '/');
|
||||||
}
|
};
|
||||||
if path.len() > 1 && path.ends_with('/') {
|
|
||||||
path.pop();
|
|
||||||
}
|
|
||||||
self.parts
|
self.parts
|
||||||
.as_mut()
|
.as_mut()
|
||||||
.expect("Use after finish")
|
.expect("Use after finish")
|
||||||
|
@@ -17,57 +17,34 @@ use tokio_io::{AsyncRead, AsyncWrite};
|
|||||||
use tokio_timer::Delay;
|
use tokio_timer::Delay;
|
||||||
|
|
||||||
#[cfg(feature = "alpn")]
|
#[cfg(feature = "alpn")]
|
||||||
use openssl::ssl::{Error as OpensslError, SslConnector, SslMethod};
|
use {
|
||||||
#[cfg(feature = "alpn")]
|
openssl::ssl::{Error as SslError, SslConnector, SslMethod},
|
||||||
use tokio_openssl::SslConnectorExt;
|
tokio_openssl::SslConnectorExt,
|
||||||
|
};
|
||||||
|
|
||||||
#[cfg(all(feature = "tls", not(feature = "alpn")))]
|
#[cfg(all(feature = "tls", not(feature = "alpn")))]
|
||||||
use native_tls::{Error as TlsError, TlsConnector as NativeTlsConnector};
|
use {
|
||||||
#[cfg(all(feature = "tls", not(feature = "alpn")))]
|
native_tls::{Error as SslError, TlsConnector as NativeTlsConnector},
|
||||||
use tokio_tls::{TlsConnector};
|
tokio_tls::TlsConnector as SslConnector,
|
||||||
|
};
|
||||||
|
|
||||||
#[cfg(
|
#[cfg(all(
|
||||||
all(
|
|
||||||
feature = "rust-tls",
|
feature = "rust-tls",
|
||||||
not(any(feature = "alpn", feature = "tls"))
|
not(any(feature = "alpn", feature = "tls"))
|
||||||
)
|
))]
|
||||||
)]
|
use {
|
||||||
use rustls::ClientConfig;
|
rustls::ClientConfig, std::io::Error as SslError, std::sync::Arc,
|
||||||
#[cfg(
|
tokio_rustls::ClientConfigExt, webpki::DNSNameRef, webpki_roots,
|
||||||
all(
|
};
|
||||||
|
|
||||||
|
#[cfg(all(
|
||||||
feature = "rust-tls",
|
feature = "rust-tls",
|
||||||
not(any(feature = "alpn", feature = "tls"))
|
not(any(feature = "alpn", feature = "tls"))
|
||||||
)
|
))]
|
||||||
)]
|
type SslConnector = Arc<ClientConfig>;
|
||||||
use std::io::Error as TLSError;
|
|
||||||
#[cfg(
|
#[cfg(not(any(feature = "alpn", feature = "tls", feature = "rust-tls")))]
|
||||||
all(
|
type SslConnector = ();
|
||||||
feature = "rust-tls",
|
|
||||||
not(any(feature = "alpn", feature = "tls"))
|
|
||||||
)
|
|
||||||
)]
|
|
||||||
use std::sync::Arc;
|
|
||||||
#[cfg(
|
|
||||||
all(
|
|
||||||
feature = "rust-tls",
|
|
||||||
not(any(feature = "alpn", feature = "tls"))
|
|
||||||
)
|
|
||||||
)]
|
|
||||||
use tokio_rustls::ClientConfigExt;
|
|
||||||
#[cfg(
|
|
||||||
all(
|
|
||||||
feature = "rust-tls",
|
|
||||||
not(any(feature = "alpn", feature = "tls"))
|
|
||||||
)
|
|
||||||
)]
|
|
||||||
use webpki::DNSNameRef;
|
|
||||||
#[cfg(
|
|
||||||
all(
|
|
||||||
feature = "rust-tls",
|
|
||||||
not(any(feature = "alpn", feature = "tls"))
|
|
||||||
)
|
|
||||||
)]
|
|
||||||
use webpki_roots;
|
|
||||||
|
|
||||||
use server::IoStream;
|
use server::IoStream;
|
||||||
use {HAS_OPENSSL, HAS_RUSTLS, HAS_TLS};
|
use {HAS_OPENSSL, HAS_RUSTLS, HAS_TLS};
|
||||||
@@ -173,24 +150,9 @@ pub enum ClientConnectorError {
|
|||||||
SslIsNotSupported,
|
SslIsNotSupported,
|
||||||
|
|
||||||
/// SSL error
|
/// SSL error
|
||||||
#[cfg(feature = "alpn")]
|
#[cfg(any(feature = "tls", feature = "alpn", feature = "rust-tls"))]
|
||||||
#[fail(display = "{}", _0)]
|
#[fail(display = "{}", _0)]
|
||||||
SslError(#[cause] OpensslError),
|
SslError(#[cause] SslError),
|
||||||
|
|
||||||
/// SSL error
|
|
||||||
#[cfg(all(feature = "tls", not(feature = "alpn")))]
|
|
||||||
#[fail(display = "{}", _0)]
|
|
||||||
SslError(#[cause] TlsError),
|
|
||||||
|
|
||||||
/// SSL error
|
|
||||||
#[cfg(
|
|
||||||
all(
|
|
||||||
feature = "rust-tls",
|
|
||||||
not(any(feature = "alpn", feature = "tls"))
|
|
||||||
)
|
|
||||||
)]
|
|
||||||
#[fail(display = "{}", _0)]
|
|
||||||
SslError(#[cause] TLSError),
|
|
||||||
|
|
||||||
/// Resolver error
|
/// Resolver error
|
||||||
#[fail(display = "{}", _0)]
|
#[fail(display = "{}", _0)]
|
||||||
@@ -242,17 +204,7 @@ impl Paused {
|
|||||||
/// `ClientConnector` type is responsible for transport layer of a
|
/// `ClientConnector` type is responsible for transport layer of a
|
||||||
/// client connection.
|
/// client connection.
|
||||||
pub struct ClientConnector {
|
pub struct ClientConnector {
|
||||||
#[cfg(all(feature = "alpn"))]
|
|
||||||
connector: SslConnector,
|
connector: SslConnector,
|
||||||
#[cfg(all(feature = "tls", not(feature = "alpn")))]
|
|
||||||
connector: TlsConnector,
|
|
||||||
#[cfg(
|
|
||||||
all(
|
|
||||||
feature = "rust-tls",
|
|
||||||
not(any(feature = "alpn", feature = "tls"))
|
|
||||||
)
|
|
||||||
)]
|
|
||||||
connector: Arc<ClientConfig>,
|
|
||||||
|
|
||||||
stats: ClientConnectorStats,
|
stats: ClientConnectorStats,
|
||||||
subscriber: Option<Recipient<ClientConnectorStats>>,
|
subscriber: Option<Recipient<ClientConnectorStats>>,
|
||||||
@@ -293,71 +245,36 @@ impl SystemService for ClientConnector {}
|
|||||||
|
|
||||||
impl Default for ClientConnector {
|
impl Default for ClientConnector {
|
||||||
fn default() -> ClientConnector {
|
fn default() -> ClientConnector {
|
||||||
|
let connector = {
|
||||||
#[cfg(all(feature = "alpn"))]
|
#[cfg(all(feature = "alpn"))]
|
||||||
{
|
{
|
||||||
let builder = SslConnector::builder(SslMethod::tls()).unwrap();
|
SslConnector::builder(SslMethod::tls()).unwrap().build()
|
||||||
ClientConnector::with_connector(builder.build())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(all(feature = "tls", not(feature = "alpn")))]
|
#[cfg(all(feature = "tls", not(feature = "alpn")))]
|
||||||
{
|
{
|
||||||
let (tx, rx) = mpsc::unbounded();
|
NativeTlsConnector::builder().build().unwrap().into()
|
||||||
let builder = NativeTlsConnector::builder();
|
|
||||||
ClientConnector {
|
|
||||||
stats: ClientConnectorStats::default(),
|
|
||||||
subscriber: None,
|
|
||||||
acq_tx: tx,
|
|
||||||
acq_rx: Some(rx),
|
|
||||||
resolver: None,
|
|
||||||
connector: builder.build().unwrap().into(),
|
|
||||||
conn_lifetime: Duration::from_secs(75),
|
|
||||||
conn_keep_alive: Duration::from_secs(15),
|
|
||||||
limit: 100,
|
|
||||||
limit_per_host: 0,
|
|
||||||
acquired: 0,
|
|
||||||
acquired_per_host: HashMap::new(),
|
|
||||||
available: HashMap::new(),
|
|
||||||
to_close: Vec::new(),
|
|
||||||
waiters: Some(HashMap::new()),
|
|
||||||
wait_timeout: None,
|
|
||||||
paused: Paused::No,
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
#[cfg(
|
#[cfg(all(
|
||||||
all(
|
|
||||||
feature = "rust-tls",
|
feature = "rust-tls",
|
||||||
not(any(feature = "alpn", feature = "tls"))
|
not(any(feature = "alpn", feature = "tls"))
|
||||||
)
|
))]
|
||||||
)]
|
|
||||||
{
|
{
|
||||||
let mut config = ClientConfig::new();
|
let mut config = ClientConfig::new();
|
||||||
config
|
config
|
||||||
.root_store
|
.root_store
|
||||||
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
|
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
|
||||||
ClientConnector::with_connector(config)
|
Arc::new(config)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(any(feature = "alpn", feature = "tls", feature = "rust-tls")))]
|
#[cfg(not(any(feature = "alpn", feature = "tls", feature = "rust-tls")))]
|
||||||
{
|
{
|
||||||
let (tx, rx) = mpsc::unbounded();
|
()
|
||||||
ClientConnector {
|
|
||||||
stats: ClientConnectorStats::default(),
|
|
||||||
subscriber: None,
|
|
||||||
acq_tx: tx,
|
|
||||||
acq_rx: Some(rx),
|
|
||||||
resolver: None,
|
|
||||||
conn_lifetime: Duration::from_secs(75),
|
|
||||||
conn_keep_alive: Duration::from_secs(15),
|
|
||||||
limit: 100,
|
|
||||||
limit_per_host: 0,
|
|
||||||
acquired: 0,
|
|
||||||
acquired_per_host: HashMap::new(),
|
|
||||||
available: HashMap::new(),
|
|
||||||
to_close: Vec::new(),
|
|
||||||
waiters: Some(HashMap::new()),
|
|
||||||
wait_timeout: None,
|
|
||||||
paused: Paused::No,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
ClientConnector::with_connector_impl(connector)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -375,7 +292,6 @@ impl ClientConnector {
|
|||||||
/// # extern crate futures;
|
/// # extern crate futures;
|
||||||
/// # use futures::{future, Future};
|
/// # use futures::{future, Future};
|
||||||
/// # use std::io::Write;
|
/// # use std::io::Write;
|
||||||
/// # use std::process;
|
|
||||||
/// # use actix_web::actix::Actor;
|
/// # use actix_web::actix::Actor;
|
||||||
/// extern crate openssl;
|
/// extern crate openssl;
|
||||||
/// use actix_web::{actix, client::ClientConnector, client::Connect};
|
/// use actix_web::{actix, client::ClientConnector, client::Connect};
|
||||||
@@ -402,35 +318,14 @@ impl ClientConnector {
|
|||||||
/// }
|
/// }
|
||||||
/// ```
|
/// ```
|
||||||
pub fn with_connector(connector: SslConnector) -> ClientConnector {
|
pub fn with_connector(connector: SslConnector) -> ClientConnector {
|
||||||
let (tx, rx) = mpsc::unbounded();
|
// keep level of indirection for docstrings matching featureflags
|
||||||
|
Self::with_connector_impl(connector)
|
||||||
ClientConnector {
|
|
||||||
connector,
|
|
||||||
stats: ClientConnectorStats::default(),
|
|
||||||
subscriber: None,
|
|
||||||
acq_tx: tx,
|
|
||||||
acq_rx: Some(rx),
|
|
||||||
resolver: None,
|
|
||||||
conn_lifetime: Duration::from_secs(75),
|
|
||||||
conn_keep_alive: Duration::from_secs(15),
|
|
||||||
limit: 100,
|
|
||||||
limit_per_host: 0,
|
|
||||||
acquired: 0,
|
|
||||||
acquired_per_host: HashMap::new(),
|
|
||||||
available: HashMap::new(),
|
|
||||||
to_close: Vec::new(),
|
|
||||||
waiters: Some(HashMap::new()),
|
|
||||||
wait_timeout: None,
|
|
||||||
paused: Paused::No,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(
|
#[cfg(all(
|
||||||
all(
|
|
||||||
feature = "rust-tls",
|
feature = "rust-tls",
|
||||||
not(any(feature = "alpn", feature = "tls"))
|
not(any(feature = "alpn", feature = "tls"))
|
||||||
)
|
))]
|
||||||
)]
|
|
||||||
/// Create `ClientConnector` actor with custom `SslConnector` instance.
|
/// Create `ClientConnector` actor with custom `SslConnector` instance.
|
||||||
///
|
///
|
||||||
/// By default `ClientConnector` uses very a simple SSL configuration.
|
/// By default `ClientConnector` uses very a simple SSL configuration.
|
||||||
@@ -441,10 +336,8 @@ impl ClientConnector {
|
|||||||
/// # #![cfg(feature = "rust-tls")]
|
/// # #![cfg(feature = "rust-tls")]
|
||||||
/// # extern crate actix_web;
|
/// # extern crate actix_web;
|
||||||
/// # extern crate futures;
|
/// # extern crate futures;
|
||||||
/// # extern crate tokio;
|
|
||||||
/// # use futures::{future, Future};
|
/// # use futures::{future, Future};
|
||||||
/// # use std::io::Write;
|
/// # use std::io::Write;
|
||||||
/// # use std::process;
|
|
||||||
/// # use actix_web::actix::Actor;
|
/// # use actix_web::actix::Actor;
|
||||||
/// extern crate rustls;
|
/// extern crate rustls;
|
||||||
/// extern crate webpki_roots;
|
/// extern crate webpki_roots;
|
||||||
@@ -476,10 +369,61 @@ impl ClientConnector {
|
|||||||
/// }
|
/// }
|
||||||
/// ```
|
/// ```
|
||||||
pub fn with_connector(connector: ClientConfig) -> ClientConnector {
|
pub fn with_connector(connector: ClientConfig) -> ClientConnector {
|
||||||
|
// keep level of indirection for docstrings matching featureflags
|
||||||
|
Self::with_connector_impl(Arc::new(connector))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(all(
|
||||||
|
feature = "tls",
|
||||||
|
not(any(feature = "alpn", feature = "rust-tls"))
|
||||||
|
))]
|
||||||
|
/// Create `ClientConnector` actor with custom `SslConnector` instance.
|
||||||
|
///
|
||||||
|
/// By default `ClientConnector` uses very a simple SSL configuration.
|
||||||
|
/// With `with_connector` method it is possible to use a custom
|
||||||
|
/// `SslConnector` object.
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// # #![cfg(feature = "tls")]
|
||||||
|
/// # extern crate actix_web;
|
||||||
|
/// # extern crate futures;
|
||||||
|
/// # use futures::{future, Future};
|
||||||
|
/// # use std::io::Write;
|
||||||
|
/// # use actix_web::actix::Actor;
|
||||||
|
/// extern crate native_tls;
|
||||||
|
/// extern crate webpki_roots;
|
||||||
|
/// use native_tls::TlsConnector;
|
||||||
|
/// use actix_web::{actix, client::ClientConnector, client::Connect};
|
||||||
|
///
|
||||||
|
/// fn main() {
|
||||||
|
/// actix::run(|| {
|
||||||
|
/// let connector = TlsConnector::new().unwrap();
|
||||||
|
/// let conn = ClientConnector::with_connector(connector.into()).start();
|
||||||
|
///
|
||||||
|
/// conn.send(
|
||||||
|
/// Connect::new("https://www.rust-lang.org").unwrap()) // <- connect to host
|
||||||
|
/// .map_err(|_| ())
|
||||||
|
/// .and_then(|res| {
|
||||||
|
/// if let Ok(mut stream) = res {
|
||||||
|
/// stream.write_all(b"GET / HTTP/1.0\r\n\r\n").unwrap();
|
||||||
|
/// }
|
||||||
|
/// # actix::System::current().stop();
|
||||||
|
/// Ok(())
|
||||||
|
/// })
|
||||||
|
/// });
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
|
pub fn with_connector(connector: SslConnector) -> ClientConnector {
|
||||||
|
// keep level of indirection for docstrings matching featureflags
|
||||||
|
Self::with_connector_impl(connector)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn with_connector_impl(connector: SslConnector) -> ClientConnector {
|
||||||
let (tx, rx) = mpsc::unbounded();
|
let (tx, rx) = mpsc::unbounded();
|
||||||
|
|
||||||
ClientConnector {
|
ClientConnector {
|
||||||
connector: Arc::new(connector),
|
connector,
|
||||||
stats: ClientConnectorStats::default(),
|
stats: ClientConnectorStats::default(),
|
||||||
subscriber: None,
|
subscriber: None,
|
||||||
acq_tx: tx,
|
acq_tx: tx,
|
||||||
@@ -853,12 +797,10 @@ impl ClientConnector {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(
|
#[cfg(all(
|
||||||
all(
|
|
||||||
feature = "rust-tls",
|
feature = "rust-tls",
|
||||||
not(any(feature = "alpn", feature = "tls"))
|
not(any(feature = "alpn", feature = "tls"))
|
||||||
)
|
))]
|
||||||
)]
|
|
||||||
match res {
|
match res {
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
let _ = waiter.tx.send(Err(err.into()));
|
let _ = waiter.tx.send(Err(err.into()));
|
||||||
@@ -1344,7 +1286,7 @@ impl AsyncWrite for Connection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "tls")]
|
#[cfg(feature = "tls")]
|
||||||
use tokio_tls::{TlsStream};
|
use tokio_tls::TlsStream;
|
||||||
|
|
||||||
#[cfg(feature = "tls")]
|
#[cfg(feature = "tls")]
|
||||||
/// This is temp solution untile actix-net migration
|
/// This is temp solution untile actix-net migration
|
||||||
|
@@ -20,6 +20,7 @@ const MAX_HEADERS: usize = 96;
|
|||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct HttpResponseParser {
|
pub struct HttpResponseParser {
|
||||||
decoder: Option<EncodingDecoder>,
|
decoder: Option<EncodingDecoder>,
|
||||||
|
eof: bool, // indicate that we read payload until stream eof
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Fail)]
|
#[derive(Debug, Fail)]
|
||||||
@@ -38,41 +39,40 @@ impl HttpResponseParser {
|
|||||||
where
|
where
|
||||||
T: IoStream,
|
T: IoStream,
|
||||||
{
|
{
|
||||||
// if buf is empty parse_message will always return NotReady, let's avoid that
|
|
||||||
if buf.is_empty() {
|
|
||||||
match io.read_available(buf) {
|
|
||||||
Ok(Async::Ready(true)) => {
|
|
||||||
return Err(HttpResponseParserError::Disconnect)
|
|
||||||
}
|
|
||||||
Ok(Async::Ready(false)) => (),
|
|
||||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
|
||||||
Err(err) => return Err(HttpResponseParserError::Error(err.into())),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
// Don't call parser until we have data to parse.
|
||||||
|
if !buf.is_empty() {
|
||||||
match HttpResponseParser::parse_message(buf)
|
match HttpResponseParser::parse_message(buf)
|
||||||
.map_err(HttpResponseParserError::Error)?
|
.map_err(HttpResponseParserError::Error)?
|
||||||
{
|
{
|
||||||
Async::Ready((msg, decoder)) => {
|
Async::Ready((msg, info)) => {
|
||||||
self.decoder = decoder;
|
if let Some((decoder, eof)) = info {
|
||||||
|
self.eof = eof;
|
||||||
|
self.decoder = Some(decoder);
|
||||||
|
} else {
|
||||||
|
self.eof = false;
|
||||||
|
self.decoder = None;
|
||||||
|
}
|
||||||
return Ok(Async::Ready(msg));
|
return Ok(Async::Ready(msg));
|
||||||
}
|
}
|
||||||
Async::NotReady => {
|
Async::NotReady => {
|
||||||
if buf.capacity() >= MAX_BUFFER_SIZE {
|
if buf.capacity() >= MAX_BUFFER_SIZE {
|
||||||
return Err(HttpResponseParserError::Error(ParseError::TooLarge));
|
return Err(HttpResponseParserError::Error(
|
||||||
|
ParseError::TooLarge,
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
// Parser needs more data.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Read some more data into the buffer for the parser.
|
||||||
match io.read_available(buf) {
|
match io.read_available(buf) {
|
||||||
Ok(Async::Ready(true)) => {
|
Ok(Async::Ready((false, true))) => {
|
||||||
return Err(HttpResponseParserError::Disconnect)
|
return Err(HttpResponseParserError::Disconnect)
|
||||||
}
|
}
|
||||||
Ok(Async::Ready(false)) => (),
|
Ok(Async::Ready(_)) => (),
|
||||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||||
Err(err) => {
|
Err(err) => return Err(HttpResponseParserError::Error(err.into())),
|
||||||
return Err(HttpResponseParserError::Error(err.into()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -87,8 +87,8 @@ impl HttpResponseParser {
|
|||||||
loop {
|
loop {
|
||||||
// read payload
|
// read payload
|
||||||
let (not_ready, stream_finished) = match io.read_available(buf) {
|
let (not_ready, stream_finished) = match io.read_available(buf) {
|
||||||
Ok(Async::Ready(true)) => (false, true),
|
Ok(Async::Ready((_, true))) => (false, true),
|
||||||
Ok(Async::Ready(false)) => (false, false),
|
Ok(Async::Ready((_, false))) => (false, false),
|
||||||
Ok(Async::NotReady) => (true, false),
|
Ok(Async::NotReady) => (true, false),
|
||||||
Err(err) => return Err(err.into()),
|
Err(err) => return Err(err.into()),
|
||||||
};
|
};
|
||||||
@@ -104,9 +104,14 @@ impl HttpResponseParser {
|
|||||||
return Ok(Async::NotReady);
|
return Ok(Async::NotReady);
|
||||||
}
|
}
|
||||||
if stream_finished {
|
if stream_finished {
|
||||||
|
// read untile eof?
|
||||||
|
if self.eof {
|
||||||
|
return Ok(Async::Ready(None));
|
||||||
|
} else {
|
||||||
return Err(PayloadError::Incomplete);
|
return Err(PayloadError::Incomplete);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
Err(err) => return Err(err.into()),
|
Err(err) => return Err(err.into()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -117,7 +122,7 @@ impl HttpResponseParser {
|
|||||||
|
|
||||||
fn parse_message(
|
fn parse_message(
|
||||||
buf: &mut BytesMut,
|
buf: &mut BytesMut,
|
||||||
) -> Poll<(ClientResponse, Option<EncodingDecoder>), ParseError> {
|
) -> Poll<(ClientResponse, Option<(EncodingDecoder, bool)>), ParseError> {
|
||||||
// Unsafe: we read only this data only after httparse parses headers into.
|
// Unsafe: we read only this data only after httparse parses headers into.
|
||||||
// performance bump for pipeline benchmarks.
|
// performance bump for pipeline benchmarks.
|
||||||
let mut headers: [HeaderIndex; MAX_HEADERS] = unsafe { mem::uninitialized() };
|
let mut headers: [HeaderIndex; MAX_HEADERS] = unsafe { mem::uninitialized() };
|
||||||
@@ -163,12 +168,12 @@ impl HttpResponseParser {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let decoder = if status == StatusCode::SWITCHING_PROTOCOLS {
|
let decoder = if status == StatusCode::SWITCHING_PROTOCOLS {
|
||||||
Some(EncodingDecoder::eof())
|
Some((EncodingDecoder::eof(), true))
|
||||||
} else if let Some(len) = hdrs.get(header::CONTENT_LENGTH) {
|
} else if let Some(len) = hdrs.get(header::CONTENT_LENGTH) {
|
||||||
// Content-Length
|
// Content-Length
|
||||||
if let Ok(s) = len.to_str() {
|
if let Ok(s) = len.to_str() {
|
||||||
if let Ok(len) = s.parse::<u64>() {
|
if let Ok(len) = s.parse::<u64>() {
|
||||||
Some(EncodingDecoder::length(len))
|
Some((EncodingDecoder::length(len), false))
|
||||||
} else {
|
} else {
|
||||||
debug!("illegal Content-Length: {:?}", len);
|
debug!("illegal Content-Length: {:?}", len);
|
||||||
return Err(ParseError::Header);
|
return Err(ParseError::Header);
|
||||||
@@ -179,7 +184,18 @@ impl HttpResponseParser {
|
|||||||
}
|
}
|
||||||
} else if chunked(&hdrs)? {
|
} else if chunked(&hdrs)? {
|
||||||
// Chunked encoding
|
// Chunked encoding
|
||||||
Some(EncodingDecoder::chunked())
|
Some((EncodingDecoder::chunked(), false))
|
||||||
|
} else if let Some(value) = hdrs.get(header::CONNECTION) {
|
||||||
|
let close = if let Ok(s) = value.to_str() {
|
||||||
|
s == "close"
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
};
|
||||||
|
if close {
|
||||||
|
Some((EncodingDecoder::eof(), true))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
@@ -236,7 +236,6 @@ macro_rules! FROM_STR {
|
|||||||
($type:ty) => {
|
($type:ty) => {
|
||||||
impl FromParam for $type {
|
impl FromParam for $type {
|
||||||
type Err = InternalError<<$type as FromStr>::Err>;
|
type Err = InternalError<<$type as FromStr>::Err>;
|
||||||
|
|
||||||
fn from_param(val: &str) -> Result<Self, Self::Err> {
|
fn from_param(val: &str) -> Result<Self, Self::Err> {
|
||||||
<$type as FromStr>::from_str(val)
|
<$type as FromStr>::from_str(val)
|
||||||
.map_err(|e| InternalError::new(e, StatusCode::BAD_REQUEST))
|
.map_err(|e| InternalError::new(e, StatusCode::BAD_REQUEST))
|
||||||
|
124
src/scope.rs
124
src/scope.rs
@@ -5,7 +5,10 @@ use std::rc::Rc;
|
|||||||
use futures::{Async, Future, Poll};
|
use futures::{Async, Future, Poll};
|
||||||
|
|
||||||
use error::Error;
|
use error::Error;
|
||||||
use handler::{AsyncResult, AsyncResultItem, FromRequest, Responder, RouteHandler};
|
use handler::{
|
||||||
|
AsyncResult, AsyncResultItem, FromRequest, Handler, Responder, RouteHandler,
|
||||||
|
WrapHandler,
|
||||||
|
};
|
||||||
use http::Method;
|
use http::Method;
|
||||||
use httprequest::HttpRequest;
|
use httprequest::HttpRequest;
|
||||||
use httpresponse::HttpResponse;
|
use httpresponse::HttpResponse;
|
||||||
@@ -180,7 +183,7 @@ impl<S: 'static> Scope<S> {
|
|||||||
where
|
where
|
||||||
F: FnOnce(Scope<S>) -> Scope<S>,
|
F: FnOnce(Scope<S>) -> Scope<S>,
|
||||||
{
|
{
|
||||||
let rdef = ResourceDef::prefix(&path);
|
let rdef = ResourceDef::prefix(&insert_slash(path));
|
||||||
let scope = Scope {
|
let scope = Scope {
|
||||||
rdef: rdef.clone(),
|
rdef: rdef.clone(),
|
||||||
filters: Vec::new(),
|
filters: Vec::new(),
|
||||||
@@ -227,9 +230,11 @@ impl<S: 'static> Scope<S> {
|
|||||||
R: Responder + 'static,
|
R: Responder + 'static,
|
||||||
T: FromRequest<S> + 'static,
|
T: FromRequest<S> + 'static,
|
||||||
{
|
{
|
||||||
Rc::get_mut(&mut self.router)
|
Rc::get_mut(&mut self.router).unwrap().register_route(
|
||||||
.unwrap()
|
&insert_slash(path),
|
||||||
.register_route(path, method, f);
|
method,
|
||||||
|
f,
|
||||||
|
);
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -261,7 +266,7 @@ impl<S: 'static> Scope<S> {
|
|||||||
F: FnOnce(&mut Resource<S>) -> R + 'static,
|
F: FnOnce(&mut Resource<S>) -> R + 'static,
|
||||||
{
|
{
|
||||||
// add resource
|
// add resource
|
||||||
let mut resource = Resource::new(ResourceDef::new(path));
|
let mut resource = Resource::new(ResourceDef::new(&insert_slash(path)));
|
||||||
f(&mut resource);
|
f(&mut resource);
|
||||||
|
|
||||||
Rc::get_mut(&mut self.router)
|
Rc::get_mut(&mut self.router)
|
||||||
@@ -286,6 +291,35 @@ impl<S: 'static> Scope<S> {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Configure handler for specific path prefix.
|
||||||
|
///
|
||||||
|
/// A path prefix consists of valid path segments, i.e for the
|
||||||
|
/// prefix `/app` any request with the paths `/app`, `/app/` or
|
||||||
|
/// `/app/test` would match, but the path `/application` would
|
||||||
|
/// not.
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// # extern crate actix_web;
|
||||||
|
/// use actix_web::{http, App, HttpRequest, HttpResponse};
|
||||||
|
///
|
||||||
|
/// fn main() {
|
||||||
|
/// let app = App::new().scope("/scope-prefix", |scope| {
|
||||||
|
/// scope.handler("/app", |req: &HttpRequest| match *req.method() {
|
||||||
|
/// http::Method::GET => HttpResponse::Ok(),
|
||||||
|
/// http::Method::POST => HttpResponse::MethodNotAllowed(),
|
||||||
|
/// _ => HttpResponse::NotFound(),
|
||||||
|
/// })
|
||||||
|
/// });
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
|
pub fn handler<H: Handler<S>>(mut self, path: &str, handler: H) -> Scope<S> {
|
||||||
|
let path = insert_slash(path.trim().trim_right_matches('/'));
|
||||||
|
Rc::get_mut(&mut self.router)
|
||||||
|
.expect("Multiple copies of scope router")
|
||||||
|
.register_handler(&path, Box::new(WrapHandler::new(handler)), None);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Register a scope middleware
|
/// Register a scope middleware
|
||||||
///
|
///
|
||||||
/// This is similar to `App's` middlewares, but
|
/// This is similar to `App's` middlewares, but
|
||||||
@@ -301,6 +335,14 @@ impl<S: 'static> Scope<S> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn insert_slash(path: &str) -> String {
|
||||||
|
let mut path = path.to_owned();
|
||||||
|
if !path.is_empty() && !path.starts_with('/') {
|
||||||
|
path.insert(0, '/');
|
||||||
|
};
|
||||||
|
path
|
||||||
|
}
|
||||||
|
|
||||||
impl<S: 'static> RouteHandler<S> for Scope<S> {
|
impl<S: 'static> RouteHandler<S> for Scope<S> {
|
||||||
fn handle(&self, req: &HttpRequest<S>) -> AsyncResult<HttpResponse> {
|
fn handle(&self, req: &HttpRequest<S>) -> AsyncResult<HttpResponse> {
|
||||||
let tail = req.match_info().tail as usize;
|
let tail = req.match_info().tail as usize;
|
||||||
@@ -803,6 +845,34 @@ mod tests {
|
|||||||
assert_eq!(resp.as_msg().status(), StatusCode::NOT_FOUND);
|
assert_eq!(resp.as_msg().status(), StatusCode::NOT_FOUND);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_scope_route_without_leading_slash() {
|
||||||
|
let app = App::new()
|
||||||
|
.scope("app", |scope| {
|
||||||
|
scope
|
||||||
|
.route("path1", Method::GET, |_: HttpRequest<_>| HttpResponse::Ok())
|
||||||
|
.route("path1", Method::DELETE, |_: HttpRequest<_>| {
|
||||||
|
HttpResponse::Ok()
|
||||||
|
})
|
||||||
|
}).finish();
|
||||||
|
|
||||||
|
let req = TestRequest::with_uri("/app/path1").request();
|
||||||
|
let resp = app.run(req);
|
||||||
|
assert_eq!(resp.as_msg().status(), StatusCode::OK);
|
||||||
|
|
||||||
|
let req = TestRequest::with_uri("/app/path1")
|
||||||
|
.method(Method::DELETE)
|
||||||
|
.request();
|
||||||
|
let resp = app.run(req);
|
||||||
|
assert_eq!(resp.as_msg().status(), StatusCode::OK);
|
||||||
|
|
||||||
|
let req = TestRequest::with_uri("/app/path1")
|
||||||
|
.method(Method::POST)
|
||||||
|
.request();
|
||||||
|
let resp = app.run(req);
|
||||||
|
assert_eq!(resp.as_msg().status(), StatusCode::NOT_FOUND);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_scope_filter() {
|
fn test_scope_filter() {
|
||||||
let app = App::new()
|
let app = App::new()
|
||||||
@@ -972,6 +1042,20 @@ mod tests {
|
|||||||
assert_eq!(resp.as_msg().status(), StatusCode::CREATED);
|
assert_eq!(resp.as_msg().status(), StatusCode::CREATED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_nested_scope_no_slash() {
|
||||||
|
let app = App::new()
|
||||||
|
.scope("/app", |scope| {
|
||||||
|
scope.nested("t1", |scope| {
|
||||||
|
scope.resource("/path1", |r| r.f(|_| HttpResponse::Created()))
|
||||||
|
})
|
||||||
|
}).finish();
|
||||||
|
|
||||||
|
let req = TestRequest::with_uri("/app/t1/path1").request();
|
||||||
|
let resp = app.run(req);
|
||||||
|
assert_eq!(resp.as_msg().status(), StatusCode::CREATED);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_nested_scope_root() {
|
fn test_nested_scope_root() {
|
||||||
let app = App::new()
|
let app = App::new()
|
||||||
@@ -1120,4 +1204,32 @@ mod tests {
|
|||||||
let resp = app.run(req);
|
let resp = app.run(req);
|
||||||
assert_eq!(resp.as_msg().status(), StatusCode::METHOD_NOT_ALLOWED);
|
assert_eq!(resp.as_msg().status(), StatusCode::METHOD_NOT_ALLOWED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_handler() {
|
||||||
|
let app = App::new()
|
||||||
|
.scope("/scope", |scope| {
|
||||||
|
scope.handler("/test", |_: &_| HttpResponse::Ok())
|
||||||
|
}).finish();
|
||||||
|
|
||||||
|
let req = TestRequest::with_uri("/scope/test").request();
|
||||||
|
let resp = app.run(req);
|
||||||
|
assert_eq!(resp.as_msg().status(), StatusCode::OK);
|
||||||
|
|
||||||
|
let req = TestRequest::with_uri("/scope/test/").request();
|
||||||
|
let resp = app.run(req);
|
||||||
|
assert_eq!(resp.as_msg().status(), StatusCode::OK);
|
||||||
|
|
||||||
|
let req = TestRequest::with_uri("/scope/test/app").request();
|
||||||
|
let resp = app.run(req);
|
||||||
|
assert_eq!(resp.as_msg().status(), StatusCode::OK);
|
||||||
|
|
||||||
|
let req = TestRequest::with_uri("/scope/testapp").request();
|
||||||
|
let resp = app.run(req);
|
||||||
|
assert_eq!(resp.as_msg().status(), StatusCode::NOT_FOUND);
|
||||||
|
|
||||||
|
let req = TestRequest::with_uri("/scope/blah").request();
|
||||||
|
let resp = app.run(req);
|
||||||
|
assert_eq!(resp.as_msg().status(), StatusCode::NOT_FOUND);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -94,6 +94,7 @@ where
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut is_eof = false;
|
||||||
let kind = match self.proto {
|
let kind = match self.proto {
|
||||||
Some(HttpProtocol::H1(ref mut h1)) => {
|
Some(HttpProtocol::H1(ref mut h1)) => {
|
||||||
let result = h1.poll();
|
let result = h1.poll();
|
||||||
@@ -120,16 +121,27 @@ where
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
Some(HttpProtocol::Unknown(_, _, ref mut io, ref mut buf)) => {
|
Some(HttpProtocol::Unknown(_, _, ref mut io, ref mut buf)) => {
|
||||||
|
let mut disconnect = false;
|
||||||
match io.read_available(buf) {
|
match io.read_available(buf) {
|
||||||
Ok(Async::Ready(true)) | Err(_) => {
|
Ok(Async::Ready((read_some, stream_closed))) => {
|
||||||
|
is_eof = stream_closed;
|
||||||
|
// Only disconnect if no data was read.
|
||||||
|
if is_eof && !read_some {
|
||||||
|
disconnect = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
disconnect = true;
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
if disconnect {
|
||||||
debug!("Ignored premature client disconnection");
|
debug!("Ignored premature client disconnection");
|
||||||
if let Some(n) = self.node.as_mut() {
|
if let Some(n) = self.node.as_mut() {
|
||||||
n.remove()
|
n.remove()
|
||||||
};
|
};
|
||||||
return Err(());
|
return Err(());
|
||||||
}
|
}
|
||||||
_ => (),
|
|
||||||
}
|
|
||||||
|
|
||||||
if buf.len() >= 14 {
|
if buf.len() >= 14 {
|
||||||
if buf[..14] == HTTP2_PREFACE[..] {
|
if buf[..14] == HTTP2_PREFACE[..] {
|
||||||
@@ -148,8 +160,9 @@ where
|
|||||||
if let Some(HttpProtocol::Unknown(settings, addr, io, buf)) = self.proto.take() {
|
if let Some(HttpProtocol::Unknown(settings, addr, io, buf)) = self.proto.take() {
|
||||||
match kind {
|
match kind {
|
||||||
ProtocolKind::Http1 => {
|
ProtocolKind::Http1 => {
|
||||||
self.proto =
|
self.proto = Some(HttpProtocol::H1(h1::Http1::new(
|
||||||
Some(HttpProtocol::H1(h1::Http1::new(settings, io, addr, buf)));
|
settings, io, addr, buf, is_eof,
|
||||||
|
)));
|
||||||
return self.poll();
|
return self.poll();
|
||||||
}
|
}
|
||||||
ProtocolKind::Http2 => {
|
ProtocolKind::Http2 => {
|
||||||
|
@@ -21,7 +21,12 @@ impl HttpHandlerTask for ServerError {
|
|||||||
bytes.reserve(helpers::STATUS_LINE_BUF_SIZE + 1);
|
bytes.reserve(helpers::STATUS_LINE_BUF_SIZE + 1);
|
||||||
helpers::write_status_line(self.0, self.1.as_u16(), bytes);
|
helpers::write_status_line(self.0, self.1.as_u16(), bytes);
|
||||||
}
|
}
|
||||||
|
// Convert Status Code to Reason.
|
||||||
|
let reason = self.1.canonical_reason().unwrap_or("");
|
||||||
|
io.buffer().extend_from_slice(reason.as_bytes());
|
||||||
|
// No response body.
|
||||||
io.buffer().extend_from_slice(b"\r\ncontent-length: 0\r\n");
|
io.buffer().extend_from_slice(b"\r\ncontent-length: 0\r\n");
|
||||||
|
// date header
|
||||||
io.set_date();
|
io.set_date();
|
||||||
Ok(Async::Ready(true))
|
Ok(Async::Ready(true))
|
||||||
}
|
}
|
||||||
|
130
src/server/h1.rs
130
src/server/h1.rs
@@ -22,13 +22,14 @@ use super::{HttpHandler, HttpHandlerTask, IoStream};
|
|||||||
const MAX_PIPELINED_MESSAGES: usize = 16;
|
const MAX_PIPELINED_MESSAGES: usize = 16;
|
||||||
|
|
||||||
bitflags! {
|
bitflags! {
|
||||||
struct Flags: u8 {
|
pub struct Flags: u8 {
|
||||||
const STARTED = 0b0000_0001;
|
const STARTED = 0b0000_0001;
|
||||||
const ERROR = 0b0000_0010;
|
const ERROR = 0b0000_0010;
|
||||||
const KEEPALIVE = 0b0000_0100;
|
const KEEPALIVE = 0b0000_0100;
|
||||||
const SHUTDOWN = 0b0000_1000;
|
const SHUTDOWN = 0b0000_1000;
|
||||||
const DISCONNECTED = 0b0001_0000;
|
const READ_DISCONNECTED = 0b0001_0000;
|
||||||
const POLLED = 0b0010_0000;
|
const WRITE_DISCONNECTED = 0b0010_0000;
|
||||||
|
const POLLED = 0b0100_0000;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -90,10 +91,14 @@ where
|
|||||||
{
|
{
|
||||||
pub fn new(
|
pub fn new(
|
||||||
settings: Rc<WorkerSettings<H>>, stream: T, addr: Option<SocketAddr>,
|
settings: Rc<WorkerSettings<H>>, stream: T, addr: Option<SocketAddr>,
|
||||||
buf: BytesMut,
|
buf: BytesMut, is_eof: bool,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Http1 {
|
Http1 {
|
||||||
flags: Flags::KEEPALIVE,
|
flags: if is_eof {
|
||||||
|
Flags::READ_DISCONNECTED
|
||||||
|
} else {
|
||||||
|
Flags::KEEPALIVE
|
||||||
|
},
|
||||||
stream: H1Writer::new(stream, Rc::clone(&settings)),
|
stream: H1Writer::new(stream, Rc::clone(&settings)),
|
||||||
decoder: H1Decoder::new(),
|
decoder: H1Decoder::new(),
|
||||||
payload: None,
|
payload: None,
|
||||||
@@ -117,6 +122,13 @@ where
|
|||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn can_read(&self) -> bool {
|
fn can_read(&self) -> bool {
|
||||||
|
if self
|
||||||
|
.flags
|
||||||
|
.intersects(Flags::ERROR | Flags::READ_DISCONNECTED)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(ref info) = self.payload {
|
if let Some(ref info) = self.payload {
|
||||||
info.need_read() == PayloadStatus::Read
|
info.need_read() == PayloadStatus::Read
|
||||||
} else {
|
} else {
|
||||||
@@ -125,6 +137,8 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn notify_disconnect(&mut self) {
|
fn notify_disconnect(&mut self) {
|
||||||
|
self.flags.insert(Flags::WRITE_DISCONNECTED);
|
||||||
|
|
||||||
// notify all tasks
|
// notify all tasks
|
||||||
self.stream.disconnected();
|
self.stream.disconnected();
|
||||||
for task in &mut self.tasks {
|
for task in &mut self.tasks {
|
||||||
@@ -132,6 +146,21 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn client_disconnect(&mut self) {
|
||||||
|
// notify all tasks
|
||||||
|
self.notify_disconnect();
|
||||||
|
// kill keepalive
|
||||||
|
self.keepalive_timer.take();
|
||||||
|
|
||||||
|
// on parse error, stop reading stream but tasks need to be
|
||||||
|
// completed
|
||||||
|
self.flags.insert(Flags::ERROR);
|
||||||
|
|
||||||
|
if let Some(mut payload) = self.payload.take() {
|
||||||
|
payload.set_error(PayloadError::Incomplete);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn poll(&mut self) -> Poll<(), ()> {
|
pub fn poll(&mut self) -> Poll<(), ()> {
|
||||||
// keep-alive timer
|
// keep-alive timer
|
||||||
@@ -148,6 +177,11 @@ where
|
|||||||
|
|
||||||
// shutdown
|
// shutdown
|
||||||
if self.flags.contains(Flags::SHUTDOWN) {
|
if self.flags.contains(Flags::SHUTDOWN) {
|
||||||
|
if self.flags.intersects(
|
||||||
|
Flags::ERROR | Flags::READ_DISCONNECTED | Flags::WRITE_DISCONNECTED,
|
||||||
|
) {
|
||||||
|
return Ok(Async::Ready(()));
|
||||||
|
}
|
||||||
match self.stream.poll_completed(true) {
|
match self.stream.poll_completed(true) {
|
||||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||||
Ok(Async::Ready(_)) => return Ok(Async::Ready(())),
|
Ok(Async::Ready(_)) => return Ok(Async::Ready(())),
|
||||||
@@ -182,44 +216,25 @@ where
|
|||||||
self.flags.insert(Flags::POLLED);
|
self.flags.insert(Flags::POLLED);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// read io from socket
|
// read io from socket
|
||||||
if !self.flags.intersects(Flags::ERROR)
|
if self.can_read() && self.tasks.len() < MAX_PIPELINED_MESSAGES {
|
||||||
&& self.tasks.len() < MAX_PIPELINED_MESSAGES
|
|
||||||
&& self.can_read()
|
|
||||||
{
|
|
||||||
match self.stream.get_mut().read_available(&mut self.buf) {
|
match self.stream.get_mut().read_available(&mut self.buf) {
|
||||||
Ok(Async::Ready(disconnected)) => {
|
Ok(Async::Ready((read_some, disconnected))) => {
|
||||||
if disconnected {
|
if read_some {
|
||||||
// notify all tasks
|
|
||||||
self.notify_disconnect();
|
|
||||||
// kill keepalive
|
|
||||||
self.keepalive_timer.take();
|
|
||||||
|
|
||||||
// on parse error, stop reading stream but tasks need to be
|
|
||||||
// completed
|
|
||||||
self.flags.insert(Flags::ERROR);
|
|
||||||
|
|
||||||
if let Some(mut payload) = self.payload.take() {
|
|
||||||
payload.set_error(PayloadError::Incomplete);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
self.parse();
|
self.parse();
|
||||||
}
|
}
|
||||||
|
if disconnected {
|
||||||
|
// delay disconnect until all tasks have finished.
|
||||||
|
self.flags.insert(Flags::READ_DISCONNECTED);
|
||||||
|
if self.tasks.is_empty() {
|
||||||
|
self.client_disconnect();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Ok(Async::NotReady) => (),
|
Ok(Async::NotReady) => (),
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
// notify all tasks
|
self.client_disconnect();
|
||||||
self.notify_disconnect();
|
|
||||||
// kill keepalive
|
|
||||||
self.keepalive_timer.take();
|
|
||||||
|
|
||||||
// on parse error, stop reading stream but tasks need to be
|
|
||||||
// completed
|
|
||||||
self.flags.insert(Flags::ERROR);
|
|
||||||
|
|
||||||
if let Some(mut payload) = self.payload.take() {
|
|
||||||
payload.set_error(PayloadError::Incomplete);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -233,7 +248,10 @@ where
|
|||||||
let mut idx = 0;
|
let mut idx = 0;
|
||||||
while idx < self.tasks.len() {
|
while idx < self.tasks.len() {
|
||||||
// only one task can do io operation in http/1
|
// only one task can do io operation in http/1
|
||||||
if !io && !self.tasks[idx].flags.contains(EntryFlags::EOF) {
|
if !io
|
||||||
|
&& !self.tasks[idx].flags.contains(EntryFlags::EOF)
|
||||||
|
&& !self.flags.contains(Flags::WRITE_DISCONNECTED)
|
||||||
|
{
|
||||||
// io is corrupted, send buffer
|
// io is corrupted, send buffer
|
||||||
if self.tasks[idx].flags.contains(EntryFlags::ERROR) {
|
if self.tasks[idx].flags.contains(EntryFlags::ERROR) {
|
||||||
if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
|
if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
|
||||||
@@ -297,7 +315,6 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
// cleanup finished tasks
|
// cleanup finished tasks
|
||||||
let max = self.tasks.len() >= MAX_PIPELINED_MESSAGES;
|
|
||||||
while !self.tasks.is_empty() {
|
while !self.tasks.is_empty() {
|
||||||
if self.tasks[0]
|
if self.tasks[0]
|
||||||
.flags
|
.flags
|
||||||
@@ -308,10 +325,6 @@ where
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// read more message
|
|
||||||
if max && self.tasks.len() >= MAX_PIPELINED_MESSAGES {
|
|
||||||
return Ok(Async::Ready(true));
|
|
||||||
}
|
|
||||||
|
|
||||||
// check stream state
|
// check stream state
|
||||||
if self.flags.contains(Flags::STARTED) {
|
if self.flags.contains(Flags::STARTED) {
|
||||||
@@ -331,8 +344,12 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// deal with keep-alive
|
// deal with keep-alive and steam eof (client-side write shutdown)
|
||||||
if self.tasks.is_empty() {
|
if self.tasks.is_empty() {
|
||||||
|
// handle stream eof
|
||||||
|
if self.flags.contains(Flags::READ_DISCONNECTED) {
|
||||||
|
return Ok(Async::Ready(false));
|
||||||
|
}
|
||||||
// no keep-alive
|
// no keep-alive
|
||||||
if self.flags.contains(Flags::ERROR)
|
if self.flags.contains(Flags::ERROR)
|
||||||
|| (!self.flags.contains(Flags::KEEPALIVE)
|
|| (!self.flags.contains(Flags::KEEPALIVE)
|
||||||
@@ -448,7 +465,14 @@ where
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(None) => break,
|
Ok(None) => {
|
||||||
|
if self.flags.contains(Flags::READ_DISCONNECTED)
|
||||||
|
&& self.tasks.is_empty()
|
||||||
|
{
|
||||||
|
self.client_disconnect();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
self.flags.insert(Flags::ERROR);
|
self.flags.insert(Flags::ERROR);
|
||||||
if let Some(mut payload) = self.payload.take() {
|
if let Some(mut payload) = self.payload.take() {
|
||||||
@@ -603,24 +627,36 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_req_parse() {
|
fn test_req_parse1() {
|
||||||
let buf = Buffer::new("GET /test HTTP/1.1\r\n\r\n");
|
let buf = Buffer::new("GET /test HTTP/1.1\r\n\r\n");
|
||||||
let readbuf = BytesMut::new();
|
let readbuf = BytesMut::new();
|
||||||
let settings = Rc::new(wrk_settings());
|
let settings = Rc::new(wrk_settings());
|
||||||
|
|
||||||
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf);
|
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, false);
|
||||||
h1.poll_io();
|
h1.poll_io();
|
||||||
h1.poll_io();
|
h1.poll_io();
|
||||||
assert_eq!(h1.tasks.len(), 1);
|
assert_eq!(h1.tasks.len(), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_req_parse2() {
|
||||||
|
let buf = Buffer::new("");
|
||||||
|
let readbuf =
|
||||||
|
BytesMut::from(Vec::<u8>::from(&b"GET /test HTTP/1.1\r\n\r\n"[..]));
|
||||||
|
let settings = Rc::new(wrk_settings());
|
||||||
|
|
||||||
|
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, true);
|
||||||
|
h1.poll_io();
|
||||||
|
assert_eq!(h1.tasks.len(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_req_parse_err() {
|
fn test_req_parse_err() {
|
||||||
let buf = Buffer::new("GET /test HTTP/1\r\n\r\n");
|
let buf = Buffer::new("GET /test HTTP/1\r\n\r\n");
|
||||||
let readbuf = BytesMut::new();
|
let readbuf = BytesMut::new();
|
||||||
let settings = Rc::new(wrk_settings());
|
let settings = Rc::new(wrk_settings());
|
||||||
|
|
||||||
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf);
|
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, false);
|
||||||
h1.poll_io();
|
h1.poll_io();
|
||||||
h1.poll_io();
|
h1.poll_io();
|
||||||
assert!(h1.flags.contains(Flags::ERROR));
|
assert!(h1.flags.contains(Flags::ERROR));
|
||||||
|
@@ -63,7 +63,9 @@ impl<T: AsyncWrite, H: 'static> H1Writer<T, H> {
|
|||||||
self.flags = Flags::KEEPALIVE;
|
self.flags = Flags::KEEPALIVE;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn disconnected(&mut self) {}
|
pub fn disconnected(&mut self) {
|
||||||
|
self.flags.insert(Flags::DISCONNECTED);
|
||||||
|
}
|
||||||
|
|
||||||
pub fn keepalive(&self) -> bool {
|
pub fn keepalive(&self) -> bool {
|
||||||
self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE)
|
self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE)
|
||||||
@@ -268,10 +270,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
|
|||||||
let pl: &[u8] = payload.as_ref();
|
let pl: &[u8] = payload.as_ref();
|
||||||
let n = match Self::write_data(&mut self.stream, pl) {
|
let n = match Self::write_data(&mut self.stream, pl) {
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
if err.kind() == io::ErrorKind::WriteZero {
|
|
||||||
self.disconnected();
|
self.disconnected();
|
||||||
}
|
|
||||||
|
|
||||||
return Err(err);
|
return Err(err);
|
||||||
}
|
}
|
||||||
Ok(val) => val,
|
Ok(val) => val,
|
||||||
@@ -315,14 +314,15 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
|
|||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error> {
|
fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error> {
|
||||||
|
if self.flags.contains(Flags::DISCONNECTED) {
|
||||||
|
return Err(io::Error::new(io::ErrorKind::Other, "disconnected"));
|
||||||
|
}
|
||||||
|
|
||||||
if !self.buffer.is_empty() {
|
if !self.buffer.is_empty() {
|
||||||
let written = {
|
let written = {
|
||||||
match Self::write_data(&mut self.stream, self.buffer.as_ref().as_ref()) {
|
match Self::write_data(&mut self.stream, self.buffer.as_ref().as_ref()) {
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
if err.kind() == io::ErrorKind::WriteZero {
|
|
||||||
self.disconnected();
|
self.disconnected();
|
||||||
}
|
|
||||||
|
|
||||||
return Err(err);
|
return Err(err);
|
||||||
}
|
}
|
||||||
Ok(val) => val,
|
Ok(val) => val,
|
||||||
@@ -339,7 +339,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
|
|||||||
self.stream.poll_flush()?;
|
self.stream.poll_flush()?;
|
||||||
self.stream.shutdown()
|
self.stream.shutdown()
|
||||||
} else {
|
} else {
|
||||||
self.stream.poll_flush()
|
Ok(self.stream.poll_flush()?)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -390,7 +390,7 @@ pub trait IoStream: AsyncRead + AsyncWrite + 'static {
|
|||||||
|
|
||||||
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()>;
|
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()>;
|
||||||
|
|
||||||
fn read_available(&mut self, buf: &mut BytesMut) -> Poll<bool, io::Error> {
|
fn read_available(&mut self, buf: &mut BytesMut) -> Poll<(bool, bool), io::Error> {
|
||||||
let mut read_some = false;
|
let mut read_some = false;
|
||||||
loop {
|
loop {
|
||||||
if buf.remaining_mut() < LW_BUFFER_SIZE {
|
if buf.remaining_mut() < LW_BUFFER_SIZE {
|
||||||
@@ -400,7 +400,7 @@ pub trait IoStream: AsyncRead + AsyncWrite + 'static {
|
|||||||
match self.read(buf.bytes_mut()) {
|
match self.read(buf.bytes_mut()) {
|
||||||
Ok(n) => {
|
Ok(n) => {
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
return Ok(Async::Ready(!read_some));
|
return Ok(Async::Ready((read_some, true)));
|
||||||
} else {
|
} else {
|
||||||
read_some = true;
|
read_some = true;
|
||||||
buf.advance_mut(n);
|
buf.advance_mut(n);
|
||||||
@@ -409,7 +409,7 @@ pub trait IoStream: AsyncRead + AsyncWrite + 'static {
|
|||||||
Err(e) => {
|
Err(e) => {
|
||||||
return if e.kind() == io::ErrorKind::WouldBlock {
|
return if e.kind() == io::ErrorKind::WouldBlock {
|
||||||
if read_some {
|
if read_some {
|
||||||
Ok(Async::Ready(false))
|
Ok(Async::Ready((read_some, false)))
|
||||||
} else {
|
} else {
|
||||||
Ok(Async::NotReady)
|
Ok(Async::NotReady)
|
||||||
}
|
}
|
||||||
|
@@ -217,7 +217,7 @@ impl Server {
|
|||||||
// start accept thread
|
// start accept thread
|
||||||
for sock in &self.sockets {
|
for sock in &self.sockets {
|
||||||
for s in sock.iter() {
|
for s in sock.iter() {
|
||||||
info!("Starting server on http://{:?}", s.1.local_addr().ok());
|
info!("Starting server on http://{}", s.1.local_addr().unwrap());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let rx = self
|
let rx = self
|
||||||
|
@@ -8,7 +8,8 @@ extern crate rand;
|
|||||||
#[cfg(all(unix, feature = "uds"))]
|
#[cfg(all(unix, feature = "uds"))]
|
||||||
extern crate tokio_uds;
|
extern crate tokio_uds;
|
||||||
|
|
||||||
use std::io::Read;
|
use std::io::{Read, Write};
|
||||||
|
use std::{net, thread};
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use flate2::read::GzDecoder;
|
use flate2::read::GzDecoder;
|
||||||
@@ -66,6 +67,16 @@ fn test_simple() {
|
|||||||
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
|
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_connection_close() {
|
||||||
|
let mut srv =
|
||||||
|
test::TestServer::new(|app| app.handler(|_| HttpResponse::Ok().body(STR)));
|
||||||
|
|
||||||
|
let request = srv.get().header("Connection", "close").finish().unwrap();
|
||||||
|
let response = srv.execute(request.send()).unwrap();
|
||||||
|
assert!(response.status().is_success());
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_with_query_parameter() {
|
fn test_with_query_parameter() {
|
||||||
let mut srv = test::TestServer::new(|app| {
|
let mut srv = test::TestServer::new(|app| {
|
||||||
@@ -460,3 +471,33 @@ fn test_default_headers() {
|
|||||||
"\""
|
"\""
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn client_read_until_eof() {
|
||||||
|
let addr = test::TestServer::unused_addr();
|
||||||
|
|
||||||
|
thread::spawn(move || {
|
||||||
|
let lst = net::TcpListener::bind(addr).unwrap();
|
||||||
|
|
||||||
|
for stream in lst.incoming() {
|
||||||
|
let mut stream = stream.unwrap();
|
||||||
|
let mut b = [0; 1000];
|
||||||
|
let _ = stream.read(&mut b).unwrap();
|
||||||
|
let _ = stream
|
||||||
|
.write_all(b"HTTP/1.1 200 OK\r\nconnection: close\r\n\r\nwelcome!");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut sys = actix::System::new("test");
|
||||||
|
|
||||||
|
// client request
|
||||||
|
let req = client::ClientRequest::get(format!("http://{}/", addr).as_str())
|
||||||
|
.finish()
|
||||||
|
.unwrap();
|
||||||
|
let response = sys.block_on(req.send()).unwrap();
|
||||||
|
assert!(response.status().is_success());
|
||||||
|
|
||||||
|
// read response
|
||||||
|
let bytes = sys.block_on(response.body()).unwrap();
|
||||||
|
assert_eq!(bytes, Bytes::from_static(b"welcome!"));
|
||||||
|
}
|
||||||
|
@@ -932,6 +932,28 @@ fn test_application() {
|
|||||||
assert!(response.status().is_success());
|
assert!(response.status().is_success());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_default_404_handler_response() {
|
||||||
|
let mut srv = test::TestServer::with_factory(|| {
|
||||||
|
App::new()
|
||||||
|
.prefix("/app")
|
||||||
|
.resource("", |r| r.f(|_| HttpResponse::Ok()))
|
||||||
|
.resource("/", |r| r.f(|_| HttpResponse::Ok()))
|
||||||
|
});
|
||||||
|
let addr = srv.addr();
|
||||||
|
|
||||||
|
let mut buf = [0; 24];
|
||||||
|
let request = TcpStream::connect(&addr)
|
||||||
|
.and_then(|sock| {
|
||||||
|
tokio::io::write_all(sock, "HEAD / HTTP/1.1\r\nHost: localhost\r\n\r\n")
|
||||||
|
.and_then(|(sock, _)| tokio::io::read_exact(sock, &mut buf))
|
||||||
|
.and_then(|(_, buf)| Ok(buf))
|
||||||
|
}).map_err(|e| panic!("{:?}", e));
|
||||||
|
let response = srv.execute(request).unwrap();
|
||||||
|
let rep = String::from_utf8_lossy(&response[..]);
|
||||||
|
assert!(rep.contains("HTTP/1.1 404 Not Found"));
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_server_cookies() {
|
fn test_server_cookies() {
|
||||||
use actix_web::http;
|
use actix_web::http;
|
||||||
|
Reference in New Issue
Block a user