mirror of
https://github.com/fafhrd91/actix-web
synced 2025-07-16 06:35:46 +02:00
Compare commits
29 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
4ca9fd2ad1 | ||
|
f0f67072ae | ||
|
24d1228943 | ||
|
b7a73e0a4f | ||
|
968c81e267 | ||
|
d5957a8466 | ||
|
f2f05e7715 | ||
|
3439f55288 | ||
|
0425e2776f | ||
|
6464f96f8b | ||
|
a2b170fec9 | ||
|
0b42cae082 | ||
|
c313c003a4 | ||
|
3fa23f5e10 | ||
|
2d51831899 | ||
|
e59abfd716 | ||
|
66881d7dd1 | ||
|
a42a8a2321 | ||
|
2341656173 | ||
|
487519acec | ||
|
af6caa92c8 | ||
|
3ccbce6bc8 | ||
|
797b52ecbf | ||
|
4bab50c861 | ||
|
5906971b6d | ||
|
8393d09a0f | ||
|
c3ae9997fc | ||
|
d39dcc58cd | ||
|
471a3e9806 |
21
CHANGES.md
21
CHANGES.md
@@ -1,5 +1,26 @@
|
||||
# Changes
|
||||
|
||||
## [0.7.5] - 2018-09-04
|
||||
|
||||
### Added
|
||||
|
||||
* Added the ability to pass a custom `TlsConnector`.
|
||||
|
||||
* Allow to register handlers on scope level #465
|
||||
|
||||
|
||||
### Fixed
|
||||
|
||||
* Handle socket read disconnect
|
||||
|
||||
* Handling scoped paths without leading slashes #460
|
||||
|
||||
|
||||
### Changed
|
||||
|
||||
* Read client response until eof if connection header set to close #464
|
||||
|
||||
|
||||
## [0.7.4] - 2018-08-23
|
||||
|
||||
### Added
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-web"
|
||||
version = "0.7.4"
|
||||
version = "0.7.5"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix web is a simple, pragmatic and extremely fast web framework for Rust."
|
||||
readme = "README.md"
|
||||
|
@@ -447,11 +447,8 @@ where
|
||||
{
|
||||
let mut path = path.trim().trim_right_matches('/').to_owned();
|
||||
if !path.is_empty() && !path.starts_with('/') {
|
||||
path.insert(0, '/')
|
||||
}
|
||||
if path.len() > 1 && path.ends_with('/') {
|
||||
path.pop();
|
||||
}
|
||||
path.insert(0, '/');
|
||||
};
|
||||
self.parts
|
||||
.as_mut()
|
||||
.expect("Use after finish")
|
||||
|
@@ -17,57 +17,34 @@ use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_timer::Delay;
|
||||
|
||||
#[cfg(feature = "alpn")]
|
||||
use openssl::ssl::{Error as OpensslError, SslConnector, SslMethod};
|
||||
#[cfg(feature = "alpn")]
|
||||
use tokio_openssl::SslConnectorExt;
|
||||
use {
|
||||
openssl::ssl::{Error as SslError, SslConnector, SslMethod},
|
||||
tokio_openssl::SslConnectorExt,
|
||||
};
|
||||
|
||||
#[cfg(all(feature = "tls", not(feature = "alpn")))]
|
||||
use native_tls::{Error as TlsError, TlsConnector as NativeTlsConnector};
|
||||
#[cfg(all(feature = "tls", not(feature = "alpn")))]
|
||||
use tokio_tls::{TlsConnector};
|
||||
use {
|
||||
native_tls::{Error as SslError, TlsConnector as NativeTlsConnector},
|
||||
tokio_tls::TlsConnector as SslConnector,
|
||||
};
|
||||
|
||||
#[cfg(
|
||||
all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
)
|
||||
)]
|
||||
use rustls::ClientConfig;
|
||||
#[cfg(
|
||||
all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
)
|
||||
)]
|
||||
use std::io::Error as TLSError;
|
||||
#[cfg(
|
||||
all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
)
|
||||
)]
|
||||
use std::sync::Arc;
|
||||
#[cfg(
|
||||
all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
)
|
||||
)]
|
||||
use tokio_rustls::ClientConfigExt;
|
||||
#[cfg(
|
||||
all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
)
|
||||
)]
|
||||
use webpki::DNSNameRef;
|
||||
#[cfg(
|
||||
all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
)
|
||||
)]
|
||||
use webpki_roots;
|
||||
#[cfg(all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
))]
|
||||
use {
|
||||
rustls::ClientConfig, std::io::Error as SslError, std::sync::Arc,
|
||||
tokio_rustls::ClientConfigExt, webpki::DNSNameRef, webpki_roots,
|
||||
};
|
||||
|
||||
#[cfg(all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
))]
|
||||
type SslConnector = Arc<ClientConfig>;
|
||||
|
||||
#[cfg(not(any(feature = "alpn", feature = "tls", feature = "rust-tls")))]
|
||||
type SslConnector = ();
|
||||
|
||||
use server::IoStream;
|
||||
use {HAS_OPENSSL, HAS_RUSTLS, HAS_TLS};
|
||||
@@ -173,24 +150,9 @@ pub enum ClientConnectorError {
|
||||
SslIsNotSupported,
|
||||
|
||||
/// SSL error
|
||||
#[cfg(feature = "alpn")]
|
||||
#[cfg(any(feature = "tls", feature = "alpn", feature = "rust-tls"))]
|
||||
#[fail(display = "{}", _0)]
|
||||
SslError(#[cause] OpensslError),
|
||||
|
||||
/// SSL error
|
||||
#[cfg(all(feature = "tls", not(feature = "alpn")))]
|
||||
#[fail(display = "{}", _0)]
|
||||
SslError(#[cause] TlsError),
|
||||
|
||||
/// SSL error
|
||||
#[cfg(
|
||||
all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
)
|
||||
)]
|
||||
#[fail(display = "{}", _0)]
|
||||
SslError(#[cause] TLSError),
|
||||
SslError(#[cause] SslError),
|
||||
|
||||
/// Resolver error
|
||||
#[fail(display = "{}", _0)]
|
||||
@@ -242,17 +204,7 @@ impl Paused {
|
||||
/// `ClientConnector` type is responsible for transport layer of a
|
||||
/// client connection.
|
||||
pub struct ClientConnector {
|
||||
#[cfg(all(feature = "alpn"))]
|
||||
connector: SslConnector,
|
||||
#[cfg(all(feature = "tls", not(feature = "alpn")))]
|
||||
connector: TlsConnector,
|
||||
#[cfg(
|
||||
all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
)
|
||||
)]
|
||||
connector: Arc<ClientConfig>,
|
||||
|
||||
stats: ClientConnectorStats,
|
||||
subscriber: Option<Recipient<ClientConnectorStats>>,
|
||||
@@ -293,71 +245,36 @@ impl SystemService for ClientConnector {}
|
||||
|
||||
impl Default for ClientConnector {
|
||||
fn default() -> ClientConnector {
|
||||
#[cfg(all(feature = "alpn"))]
|
||||
{
|
||||
let builder = SslConnector::builder(SslMethod::tls()).unwrap();
|
||||
ClientConnector::with_connector(builder.build())
|
||||
}
|
||||
#[cfg(all(feature = "tls", not(feature = "alpn")))]
|
||||
{
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
let builder = NativeTlsConnector::builder();
|
||||
ClientConnector {
|
||||
stats: ClientConnectorStats::default(),
|
||||
subscriber: None,
|
||||
acq_tx: tx,
|
||||
acq_rx: Some(rx),
|
||||
resolver: None,
|
||||
connector: builder.build().unwrap().into(),
|
||||
conn_lifetime: Duration::from_secs(75),
|
||||
conn_keep_alive: Duration::from_secs(15),
|
||||
limit: 100,
|
||||
limit_per_host: 0,
|
||||
acquired: 0,
|
||||
acquired_per_host: HashMap::new(),
|
||||
available: HashMap::new(),
|
||||
to_close: Vec::new(),
|
||||
waiters: Some(HashMap::new()),
|
||||
wait_timeout: None,
|
||||
paused: Paused::No,
|
||||
let connector = {
|
||||
#[cfg(all(feature = "alpn"))]
|
||||
{
|
||||
SslConnector::builder(SslMethod::tls()).unwrap().build()
|
||||
}
|
||||
}
|
||||
#[cfg(
|
||||
all(
|
||||
|
||||
#[cfg(all(feature = "tls", not(feature = "alpn")))]
|
||||
{
|
||||
NativeTlsConnector::builder().build().unwrap().into()
|
||||
}
|
||||
|
||||
#[cfg(all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
)
|
||||
)]
|
||||
{
|
||||
let mut config = ClientConfig::new();
|
||||
config
|
||||
.root_store
|
||||
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
|
||||
ClientConnector::with_connector(config)
|
||||
}
|
||||
|
||||
#[cfg(not(any(feature = "alpn", feature = "tls", feature = "rust-tls")))]
|
||||
{
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
ClientConnector {
|
||||
stats: ClientConnectorStats::default(),
|
||||
subscriber: None,
|
||||
acq_tx: tx,
|
||||
acq_rx: Some(rx),
|
||||
resolver: None,
|
||||
conn_lifetime: Duration::from_secs(75),
|
||||
conn_keep_alive: Duration::from_secs(15),
|
||||
limit: 100,
|
||||
limit_per_host: 0,
|
||||
acquired: 0,
|
||||
acquired_per_host: HashMap::new(),
|
||||
available: HashMap::new(),
|
||||
to_close: Vec::new(),
|
||||
waiters: Some(HashMap::new()),
|
||||
wait_timeout: None,
|
||||
paused: Paused::No,
|
||||
))]
|
||||
{
|
||||
let mut config = ClientConfig::new();
|
||||
config
|
||||
.root_store
|
||||
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
|
||||
Arc::new(config)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(any(feature = "alpn", feature = "tls", feature = "rust-tls")))]
|
||||
{
|
||||
()
|
||||
}
|
||||
};
|
||||
|
||||
ClientConnector::with_connector_impl(connector)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -375,7 +292,6 @@ impl ClientConnector {
|
||||
/// # extern crate futures;
|
||||
/// # use futures::{future, Future};
|
||||
/// # use std::io::Write;
|
||||
/// # use std::process;
|
||||
/// # use actix_web::actix::Actor;
|
||||
/// extern crate openssl;
|
||||
/// use actix_web::{actix, client::ClientConnector, client::Connect};
|
||||
@@ -402,35 +318,14 @@ impl ClientConnector {
|
||||
/// }
|
||||
/// ```
|
||||
pub fn with_connector(connector: SslConnector) -> ClientConnector {
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
|
||||
ClientConnector {
|
||||
connector,
|
||||
stats: ClientConnectorStats::default(),
|
||||
subscriber: None,
|
||||
acq_tx: tx,
|
||||
acq_rx: Some(rx),
|
||||
resolver: None,
|
||||
conn_lifetime: Duration::from_secs(75),
|
||||
conn_keep_alive: Duration::from_secs(15),
|
||||
limit: 100,
|
||||
limit_per_host: 0,
|
||||
acquired: 0,
|
||||
acquired_per_host: HashMap::new(),
|
||||
available: HashMap::new(),
|
||||
to_close: Vec::new(),
|
||||
waiters: Some(HashMap::new()),
|
||||
wait_timeout: None,
|
||||
paused: Paused::No,
|
||||
}
|
||||
// keep level of indirection for docstrings matching featureflags
|
||||
Self::with_connector_impl(connector)
|
||||
}
|
||||
|
||||
#[cfg(
|
||||
all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
)
|
||||
)]
|
||||
#[cfg(all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
))]
|
||||
/// Create `ClientConnector` actor with custom `SslConnector` instance.
|
||||
///
|
||||
/// By default `ClientConnector` uses very a simple SSL configuration.
|
||||
@@ -441,10 +336,8 @@ impl ClientConnector {
|
||||
/// # #![cfg(feature = "rust-tls")]
|
||||
/// # extern crate actix_web;
|
||||
/// # extern crate futures;
|
||||
/// # extern crate tokio;
|
||||
/// # use futures::{future, Future};
|
||||
/// # use std::io::Write;
|
||||
/// # use std::process;
|
||||
/// # use actix_web::actix::Actor;
|
||||
/// extern crate rustls;
|
||||
/// extern crate webpki_roots;
|
||||
@@ -476,10 +369,61 @@ impl ClientConnector {
|
||||
/// }
|
||||
/// ```
|
||||
pub fn with_connector(connector: ClientConfig) -> ClientConnector {
|
||||
// keep level of indirection for docstrings matching featureflags
|
||||
Self::with_connector_impl(Arc::new(connector))
|
||||
}
|
||||
|
||||
#[cfg(all(
|
||||
feature = "tls",
|
||||
not(any(feature = "alpn", feature = "rust-tls"))
|
||||
))]
|
||||
/// Create `ClientConnector` actor with custom `SslConnector` instance.
|
||||
///
|
||||
/// By default `ClientConnector` uses very a simple SSL configuration.
|
||||
/// With `with_connector` method it is possible to use a custom
|
||||
/// `SslConnector` object.
|
||||
///
|
||||
/// ```rust
|
||||
/// # #![cfg(feature = "tls")]
|
||||
/// # extern crate actix_web;
|
||||
/// # extern crate futures;
|
||||
/// # use futures::{future, Future};
|
||||
/// # use std::io::Write;
|
||||
/// # use actix_web::actix::Actor;
|
||||
/// extern crate native_tls;
|
||||
/// extern crate webpki_roots;
|
||||
/// use native_tls::TlsConnector;
|
||||
/// use actix_web::{actix, client::ClientConnector, client::Connect};
|
||||
///
|
||||
/// fn main() {
|
||||
/// actix::run(|| {
|
||||
/// let connector = TlsConnector::new().unwrap();
|
||||
/// let conn = ClientConnector::with_connector(connector.into()).start();
|
||||
///
|
||||
/// conn.send(
|
||||
/// Connect::new("https://www.rust-lang.org").unwrap()) // <- connect to host
|
||||
/// .map_err(|_| ())
|
||||
/// .and_then(|res| {
|
||||
/// if let Ok(mut stream) = res {
|
||||
/// stream.write_all(b"GET / HTTP/1.0\r\n\r\n").unwrap();
|
||||
/// }
|
||||
/// # actix::System::current().stop();
|
||||
/// Ok(())
|
||||
/// })
|
||||
/// });
|
||||
/// }
|
||||
/// ```
|
||||
pub fn with_connector(connector: SslConnector) -> ClientConnector {
|
||||
// keep level of indirection for docstrings matching featureflags
|
||||
Self::with_connector_impl(connector)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn with_connector_impl(connector: SslConnector) -> ClientConnector {
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
|
||||
ClientConnector {
|
||||
connector: Arc::new(connector),
|
||||
connector,
|
||||
stats: ClientConnectorStats::default(),
|
||||
subscriber: None,
|
||||
acq_tx: tx,
|
||||
@@ -853,12 +797,10 @@ impl ClientConnector {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(
|
||||
all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
)
|
||||
)]
|
||||
#[cfg(all(
|
||||
feature = "rust-tls",
|
||||
not(any(feature = "alpn", feature = "tls"))
|
||||
))]
|
||||
match res {
|
||||
Err(err) => {
|
||||
let _ = waiter.tx.send(Err(err.into()));
|
||||
@@ -1344,7 +1286,7 @@ impl AsyncWrite for Connection {
|
||||
}
|
||||
|
||||
#[cfg(feature = "tls")]
|
||||
use tokio_tls::{TlsStream};
|
||||
use tokio_tls::TlsStream;
|
||||
|
||||
#[cfg(feature = "tls")]
|
||||
/// This is temp solution untile actix-net migration
|
||||
@@ -1364,4 +1306,4 @@ impl<Io: IoStream> IoStream for TlsStream<Io> {
|
||||
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
|
||||
self.get_mut().get_mut().set_linger(dur)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -20,6 +20,7 @@ const MAX_HEADERS: usize = 96;
|
||||
#[derive(Default)]
|
||||
pub struct HttpResponseParser {
|
||||
decoder: Option<EncodingDecoder>,
|
||||
eof: bool, // indicate that we read payload until stream eof
|
||||
}
|
||||
|
||||
#[derive(Debug, Fail)]
|
||||
@@ -38,43 +39,42 @@ impl HttpResponseParser {
|
||||
where
|
||||
T: IoStream,
|
||||
{
|
||||
// if buf is empty parse_message will always return NotReady, let's avoid that
|
||||
if buf.is_empty() {
|
||||
loop {
|
||||
// Don't call parser until we have data to parse.
|
||||
if !buf.is_empty() {
|
||||
match HttpResponseParser::parse_message(buf)
|
||||
.map_err(HttpResponseParserError::Error)?
|
||||
{
|
||||
Async::Ready((msg, info)) => {
|
||||
if let Some((decoder, eof)) = info {
|
||||
self.eof = eof;
|
||||
self.decoder = Some(decoder);
|
||||
} else {
|
||||
self.eof = false;
|
||||
self.decoder = None;
|
||||
}
|
||||
return Ok(Async::Ready(msg));
|
||||
}
|
||||
Async::NotReady => {
|
||||
if buf.capacity() >= MAX_BUFFER_SIZE {
|
||||
return Err(HttpResponseParserError::Error(
|
||||
ParseError::TooLarge,
|
||||
));
|
||||
}
|
||||
// Parser needs more data.
|
||||
}
|
||||
}
|
||||
}
|
||||
// Read some more data into the buffer for the parser.
|
||||
match io.read_available(buf) {
|
||||
Ok(Async::Ready(true)) => {
|
||||
Ok(Async::Ready((false, true))) => {
|
||||
return Err(HttpResponseParserError::Disconnect)
|
||||
}
|
||||
Ok(Async::Ready(false)) => (),
|
||||
Ok(Async::Ready(_)) => (),
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Err(err) => return Err(HttpResponseParserError::Error(err.into())),
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
match HttpResponseParser::parse_message(buf)
|
||||
.map_err(HttpResponseParserError::Error)?
|
||||
{
|
||||
Async::Ready((msg, decoder)) => {
|
||||
self.decoder = decoder;
|
||||
return Ok(Async::Ready(msg));
|
||||
}
|
||||
Async::NotReady => {
|
||||
if buf.capacity() >= MAX_BUFFER_SIZE {
|
||||
return Err(HttpResponseParserError::Error(ParseError::TooLarge));
|
||||
}
|
||||
match io.read_available(buf) {
|
||||
Ok(Async::Ready(true)) => {
|
||||
return Err(HttpResponseParserError::Disconnect)
|
||||
}
|
||||
Ok(Async::Ready(false)) => (),
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Err(err) => {
|
||||
return Err(HttpResponseParserError::Error(err.into()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn parse_payload<T>(
|
||||
@@ -87,8 +87,8 @@ impl HttpResponseParser {
|
||||
loop {
|
||||
// read payload
|
||||
let (not_ready, stream_finished) = match io.read_available(buf) {
|
||||
Ok(Async::Ready(true)) => (false, true),
|
||||
Ok(Async::Ready(false)) => (false, false),
|
||||
Ok(Async::Ready((_, true))) => (false, true),
|
||||
Ok(Async::Ready((_, false))) => (false, false),
|
||||
Ok(Async::NotReady) => (true, false),
|
||||
Err(err) => return Err(err.into()),
|
||||
};
|
||||
@@ -104,7 +104,12 @@ impl HttpResponseParser {
|
||||
return Ok(Async::NotReady);
|
||||
}
|
||||
if stream_finished {
|
||||
return Err(PayloadError::Incomplete);
|
||||
// read untile eof?
|
||||
if self.eof {
|
||||
return Ok(Async::Ready(None));
|
||||
} else {
|
||||
return Err(PayloadError::Incomplete);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => return Err(err.into()),
|
||||
@@ -117,7 +122,7 @@ impl HttpResponseParser {
|
||||
|
||||
fn parse_message(
|
||||
buf: &mut BytesMut,
|
||||
) -> Poll<(ClientResponse, Option<EncodingDecoder>), ParseError> {
|
||||
) -> Poll<(ClientResponse, Option<(EncodingDecoder, bool)>), ParseError> {
|
||||
// Unsafe: we read only this data only after httparse parses headers into.
|
||||
// performance bump for pipeline benchmarks.
|
||||
let mut headers: [HeaderIndex; MAX_HEADERS] = unsafe { mem::uninitialized() };
|
||||
@@ -163,12 +168,12 @@ impl HttpResponseParser {
|
||||
}
|
||||
|
||||
let decoder = if status == StatusCode::SWITCHING_PROTOCOLS {
|
||||
Some(EncodingDecoder::eof())
|
||||
Some((EncodingDecoder::eof(), true))
|
||||
} else if let Some(len) = hdrs.get(header::CONTENT_LENGTH) {
|
||||
// Content-Length
|
||||
if let Ok(s) = len.to_str() {
|
||||
if let Ok(len) = s.parse::<u64>() {
|
||||
Some(EncodingDecoder::length(len))
|
||||
Some((EncodingDecoder::length(len), false))
|
||||
} else {
|
||||
debug!("illegal Content-Length: {:?}", len);
|
||||
return Err(ParseError::Header);
|
||||
@@ -179,7 +184,18 @@ impl HttpResponseParser {
|
||||
}
|
||||
} else if chunked(&hdrs)? {
|
||||
// Chunked encoding
|
||||
Some(EncodingDecoder::chunked())
|
||||
Some((EncodingDecoder::chunked(), false))
|
||||
} else if let Some(value) = hdrs.get(header::CONNECTION) {
|
||||
let close = if let Ok(s) = value.to_str() {
|
||||
s == "close"
|
||||
} else {
|
||||
false
|
||||
};
|
||||
if close {
|
||||
Some((EncodingDecoder::eof(), true))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
@@ -236,7 +236,6 @@ macro_rules! FROM_STR {
|
||||
($type:ty) => {
|
||||
impl FromParam for $type {
|
||||
type Err = InternalError<<$type as FromStr>::Err>;
|
||||
|
||||
fn from_param(val: &str) -> Result<Self, Self::Err> {
|
||||
<$type as FromStr>::from_str(val)
|
||||
.map_err(|e| InternalError::new(e, StatusCode::BAD_REQUEST))
|
||||
|
124
src/scope.rs
124
src/scope.rs
@@ -5,7 +5,10 @@ use std::rc::Rc;
|
||||
use futures::{Async, Future, Poll};
|
||||
|
||||
use error::Error;
|
||||
use handler::{AsyncResult, AsyncResultItem, FromRequest, Responder, RouteHandler};
|
||||
use handler::{
|
||||
AsyncResult, AsyncResultItem, FromRequest, Handler, Responder, RouteHandler,
|
||||
WrapHandler,
|
||||
};
|
||||
use http::Method;
|
||||
use httprequest::HttpRequest;
|
||||
use httpresponse::HttpResponse;
|
||||
@@ -180,7 +183,7 @@ impl<S: 'static> Scope<S> {
|
||||
where
|
||||
F: FnOnce(Scope<S>) -> Scope<S>,
|
||||
{
|
||||
let rdef = ResourceDef::prefix(&path);
|
||||
let rdef = ResourceDef::prefix(&insert_slash(path));
|
||||
let scope = Scope {
|
||||
rdef: rdef.clone(),
|
||||
filters: Vec::new(),
|
||||
@@ -227,9 +230,11 @@ impl<S: 'static> Scope<S> {
|
||||
R: Responder + 'static,
|
||||
T: FromRequest<S> + 'static,
|
||||
{
|
||||
Rc::get_mut(&mut self.router)
|
||||
.unwrap()
|
||||
.register_route(path, method, f);
|
||||
Rc::get_mut(&mut self.router).unwrap().register_route(
|
||||
&insert_slash(path),
|
||||
method,
|
||||
f,
|
||||
);
|
||||
self
|
||||
}
|
||||
|
||||
@@ -261,7 +266,7 @@ impl<S: 'static> Scope<S> {
|
||||
F: FnOnce(&mut Resource<S>) -> R + 'static,
|
||||
{
|
||||
// add resource
|
||||
let mut resource = Resource::new(ResourceDef::new(path));
|
||||
let mut resource = Resource::new(ResourceDef::new(&insert_slash(path)));
|
||||
f(&mut resource);
|
||||
|
||||
Rc::get_mut(&mut self.router)
|
||||
@@ -286,6 +291,35 @@ impl<S: 'static> Scope<S> {
|
||||
self
|
||||
}
|
||||
|
||||
/// Configure handler for specific path prefix.
|
||||
///
|
||||
/// A path prefix consists of valid path segments, i.e for the
|
||||
/// prefix `/app` any request with the paths `/app`, `/app/` or
|
||||
/// `/app/test` would match, but the path `/application` would
|
||||
/// not.
|
||||
///
|
||||
/// ```rust
|
||||
/// # extern crate actix_web;
|
||||
/// use actix_web::{http, App, HttpRequest, HttpResponse};
|
||||
///
|
||||
/// fn main() {
|
||||
/// let app = App::new().scope("/scope-prefix", |scope| {
|
||||
/// scope.handler("/app", |req: &HttpRequest| match *req.method() {
|
||||
/// http::Method::GET => HttpResponse::Ok(),
|
||||
/// http::Method::POST => HttpResponse::MethodNotAllowed(),
|
||||
/// _ => HttpResponse::NotFound(),
|
||||
/// })
|
||||
/// });
|
||||
/// }
|
||||
/// ```
|
||||
pub fn handler<H: Handler<S>>(mut self, path: &str, handler: H) -> Scope<S> {
|
||||
let path = insert_slash(path.trim().trim_right_matches('/'));
|
||||
Rc::get_mut(&mut self.router)
|
||||
.expect("Multiple copies of scope router")
|
||||
.register_handler(&path, Box::new(WrapHandler::new(handler)), None);
|
||||
self
|
||||
}
|
||||
|
||||
/// Register a scope middleware
|
||||
///
|
||||
/// This is similar to `App's` middlewares, but
|
||||
@@ -301,6 +335,14 @@ impl<S: 'static> Scope<S> {
|
||||
}
|
||||
}
|
||||
|
||||
fn insert_slash(path: &str) -> String {
|
||||
let mut path = path.to_owned();
|
||||
if !path.is_empty() && !path.starts_with('/') {
|
||||
path.insert(0, '/');
|
||||
};
|
||||
path
|
||||
}
|
||||
|
||||
impl<S: 'static> RouteHandler<S> for Scope<S> {
|
||||
fn handle(&self, req: &HttpRequest<S>) -> AsyncResult<HttpResponse> {
|
||||
let tail = req.match_info().tail as usize;
|
||||
@@ -803,6 +845,34 @@ mod tests {
|
||||
assert_eq!(resp.as_msg().status(), StatusCode::NOT_FOUND);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_scope_route_without_leading_slash() {
|
||||
let app = App::new()
|
||||
.scope("app", |scope| {
|
||||
scope
|
||||
.route("path1", Method::GET, |_: HttpRequest<_>| HttpResponse::Ok())
|
||||
.route("path1", Method::DELETE, |_: HttpRequest<_>| {
|
||||
HttpResponse::Ok()
|
||||
})
|
||||
}).finish();
|
||||
|
||||
let req = TestRequest::with_uri("/app/path1").request();
|
||||
let resp = app.run(req);
|
||||
assert_eq!(resp.as_msg().status(), StatusCode::OK);
|
||||
|
||||
let req = TestRequest::with_uri("/app/path1")
|
||||
.method(Method::DELETE)
|
||||
.request();
|
||||
let resp = app.run(req);
|
||||
assert_eq!(resp.as_msg().status(), StatusCode::OK);
|
||||
|
||||
let req = TestRequest::with_uri("/app/path1")
|
||||
.method(Method::POST)
|
||||
.request();
|
||||
let resp = app.run(req);
|
||||
assert_eq!(resp.as_msg().status(), StatusCode::NOT_FOUND);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_scope_filter() {
|
||||
let app = App::new()
|
||||
@@ -972,6 +1042,20 @@ mod tests {
|
||||
assert_eq!(resp.as_msg().status(), StatusCode::CREATED);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_nested_scope_no_slash() {
|
||||
let app = App::new()
|
||||
.scope("/app", |scope| {
|
||||
scope.nested("t1", |scope| {
|
||||
scope.resource("/path1", |r| r.f(|_| HttpResponse::Created()))
|
||||
})
|
||||
}).finish();
|
||||
|
||||
let req = TestRequest::with_uri("/app/t1/path1").request();
|
||||
let resp = app.run(req);
|
||||
assert_eq!(resp.as_msg().status(), StatusCode::CREATED);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_nested_scope_root() {
|
||||
let app = App::new()
|
||||
@@ -1120,4 +1204,32 @@ mod tests {
|
||||
let resp = app.run(req);
|
||||
assert_eq!(resp.as_msg().status(), StatusCode::METHOD_NOT_ALLOWED);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_handler() {
|
||||
let app = App::new()
|
||||
.scope("/scope", |scope| {
|
||||
scope.handler("/test", |_: &_| HttpResponse::Ok())
|
||||
}).finish();
|
||||
|
||||
let req = TestRequest::with_uri("/scope/test").request();
|
||||
let resp = app.run(req);
|
||||
assert_eq!(resp.as_msg().status(), StatusCode::OK);
|
||||
|
||||
let req = TestRequest::with_uri("/scope/test/").request();
|
||||
let resp = app.run(req);
|
||||
assert_eq!(resp.as_msg().status(), StatusCode::OK);
|
||||
|
||||
let req = TestRequest::with_uri("/scope/test/app").request();
|
||||
let resp = app.run(req);
|
||||
assert_eq!(resp.as_msg().status(), StatusCode::OK);
|
||||
|
||||
let req = TestRequest::with_uri("/scope/testapp").request();
|
||||
let resp = app.run(req);
|
||||
assert_eq!(resp.as_msg().status(), StatusCode::NOT_FOUND);
|
||||
|
||||
let req = TestRequest::with_uri("/scope/blah").request();
|
||||
let resp = app.run(req);
|
||||
assert_eq!(resp.as_msg().status(), StatusCode::NOT_FOUND);
|
||||
}
|
||||
}
|
||||
|
@@ -94,6 +94,7 @@ where
|
||||
};
|
||||
}
|
||||
|
||||
let mut is_eof = false;
|
||||
let kind = match self.proto {
|
||||
Some(HttpProtocol::H1(ref mut h1)) => {
|
||||
let result = h1.poll();
|
||||
@@ -120,16 +121,27 @@ where
|
||||
return result;
|
||||
}
|
||||
Some(HttpProtocol::Unknown(_, _, ref mut io, ref mut buf)) => {
|
||||
let mut disconnect = false;
|
||||
match io.read_available(buf) {
|
||||
Ok(Async::Ready(true)) | Err(_) => {
|
||||
debug!("Ignored premature client disconnection");
|
||||
if let Some(n) = self.node.as_mut() {
|
||||
n.remove()
|
||||
};
|
||||
return Err(());
|
||||
Ok(Async::Ready((read_some, stream_closed))) => {
|
||||
is_eof = stream_closed;
|
||||
// Only disconnect if no data was read.
|
||||
if is_eof && !read_some {
|
||||
disconnect = true;
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
disconnect = true;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
if disconnect {
|
||||
debug!("Ignored premature client disconnection");
|
||||
if let Some(n) = self.node.as_mut() {
|
||||
n.remove()
|
||||
};
|
||||
return Err(());
|
||||
}
|
||||
|
||||
if buf.len() >= 14 {
|
||||
if buf[..14] == HTTP2_PREFACE[..] {
|
||||
@@ -148,8 +160,9 @@ where
|
||||
if let Some(HttpProtocol::Unknown(settings, addr, io, buf)) = self.proto.take() {
|
||||
match kind {
|
||||
ProtocolKind::Http1 => {
|
||||
self.proto =
|
||||
Some(HttpProtocol::H1(h1::Http1::new(settings, io, addr, buf)));
|
||||
self.proto = Some(HttpProtocol::H1(h1::Http1::new(
|
||||
settings, io, addr, buf, is_eof,
|
||||
)));
|
||||
return self.poll();
|
||||
}
|
||||
ProtocolKind::Http2 => {
|
||||
|
@@ -21,7 +21,12 @@ impl HttpHandlerTask for ServerError {
|
||||
bytes.reserve(helpers::STATUS_LINE_BUF_SIZE + 1);
|
||||
helpers::write_status_line(self.0, self.1.as_u16(), bytes);
|
||||
}
|
||||
// Convert Status Code to Reason.
|
||||
let reason = self.1.canonical_reason().unwrap_or("");
|
||||
io.buffer().extend_from_slice(reason.as_bytes());
|
||||
// No response body.
|
||||
io.buffer().extend_from_slice(b"\r\ncontent-length: 0\r\n");
|
||||
// date header
|
||||
io.set_date();
|
||||
Ok(Async::Ready(true))
|
||||
}
|
||||
|
138
src/server/h1.rs
138
src/server/h1.rs
@@ -22,13 +22,14 @@ use super::{HttpHandler, HttpHandlerTask, IoStream};
|
||||
const MAX_PIPELINED_MESSAGES: usize = 16;
|
||||
|
||||
bitflags! {
|
||||
struct Flags: u8 {
|
||||
const STARTED = 0b0000_0001;
|
||||
const ERROR = 0b0000_0010;
|
||||
const KEEPALIVE = 0b0000_0100;
|
||||
const SHUTDOWN = 0b0000_1000;
|
||||
const DISCONNECTED = 0b0001_0000;
|
||||
const POLLED = 0b0010_0000;
|
||||
pub struct Flags: u8 {
|
||||
const STARTED = 0b0000_0001;
|
||||
const ERROR = 0b0000_0010;
|
||||
const KEEPALIVE = 0b0000_0100;
|
||||
const SHUTDOWN = 0b0000_1000;
|
||||
const READ_DISCONNECTED = 0b0001_0000;
|
||||
const WRITE_DISCONNECTED = 0b0010_0000;
|
||||
const POLLED = 0b0100_0000;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -90,10 +91,14 @@ where
|
||||
{
|
||||
pub fn new(
|
||||
settings: Rc<WorkerSettings<H>>, stream: T, addr: Option<SocketAddr>,
|
||||
buf: BytesMut,
|
||||
buf: BytesMut, is_eof: bool,
|
||||
) -> Self {
|
||||
Http1 {
|
||||
flags: Flags::KEEPALIVE,
|
||||
flags: if is_eof {
|
||||
Flags::READ_DISCONNECTED
|
||||
} else {
|
||||
Flags::KEEPALIVE
|
||||
},
|
||||
stream: H1Writer::new(stream, Rc::clone(&settings)),
|
||||
decoder: H1Decoder::new(),
|
||||
payload: None,
|
||||
@@ -117,6 +122,13 @@ where
|
||||
|
||||
#[inline]
|
||||
fn can_read(&self) -> bool {
|
||||
if self
|
||||
.flags
|
||||
.intersects(Flags::ERROR | Flags::READ_DISCONNECTED)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if let Some(ref info) = self.payload {
|
||||
info.need_read() == PayloadStatus::Read
|
||||
} else {
|
||||
@@ -125,6 +137,8 @@ where
|
||||
}
|
||||
|
||||
fn notify_disconnect(&mut self) {
|
||||
self.flags.insert(Flags::WRITE_DISCONNECTED);
|
||||
|
||||
// notify all tasks
|
||||
self.stream.disconnected();
|
||||
for task in &mut self.tasks {
|
||||
@@ -132,6 +146,21 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn client_disconnect(&mut self) {
|
||||
// notify all tasks
|
||||
self.notify_disconnect();
|
||||
// kill keepalive
|
||||
self.keepalive_timer.take();
|
||||
|
||||
// on parse error, stop reading stream but tasks need to be
|
||||
// completed
|
||||
self.flags.insert(Flags::ERROR);
|
||||
|
||||
if let Some(mut payload) = self.payload.take() {
|
||||
payload.set_error(PayloadError::Incomplete);
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn poll(&mut self) -> Poll<(), ()> {
|
||||
// keep-alive timer
|
||||
@@ -148,6 +177,11 @@ where
|
||||
|
||||
// shutdown
|
||||
if self.flags.contains(Flags::SHUTDOWN) {
|
||||
if self.flags.intersects(
|
||||
Flags::ERROR | Flags::READ_DISCONNECTED | Flags::WRITE_DISCONNECTED,
|
||||
) {
|
||||
return Ok(Async::Ready(()));
|
||||
}
|
||||
match self.stream.poll_completed(true) {
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Ok(Async::Ready(_)) => return Ok(Async::Ready(())),
|
||||
@@ -182,44 +216,25 @@ where
|
||||
self.flags.insert(Flags::POLLED);
|
||||
return;
|
||||
}
|
||||
|
||||
// read io from socket
|
||||
if !self.flags.intersects(Flags::ERROR)
|
||||
&& self.tasks.len() < MAX_PIPELINED_MESSAGES
|
||||
&& self.can_read()
|
||||
{
|
||||
if self.can_read() && self.tasks.len() < MAX_PIPELINED_MESSAGES {
|
||||
match self.stream.get_mut().read_available(&mut self.buf) {
|
||||
Ok(Async::Ready(disconnected)) => {
|
||||
if disconnected {
|
||||
// notify all tasks
|
||||
self.notify_disconnect();
|
||||
// kill keepalive
|
||||
self.keepalive_timer.take();
|
||||
|
||||
// on parse error, stop reading stream but tasks need to be
|
||||
// completed
|
||||
self.flags.insert(Flags::ERROR);
|
||||
|
||||
if let Some(mut payload) = self.payload.take() {
|
||||
payload.set_error(PayloadError::Incomplete);
|
||||
}
|
||||
} else {
|
||||
Ok(Async::Ready((read_some, disconnected))) => {
|
||||
if read_some {
|
||||
self.parse();
|
||||
}
|
||||
if disconnected {
|
||||
// delay disconnect until all tasks have finished.
|
||||
self.flags.insert(Flags::READ_DISCONNECTED);
|
||||
if self.tasks.is_empty() {
|
||||
self.client_disconnect();
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Async::NotReady) => (),
|
||||
Err(_) => {
|
||||
// notify all tasks
|
||||
self.notify_disconnect();
|
||||
// kill keepalive
|
||||
self.keepalive_timer.take();
|
||||
|
||||
// on parse error, stop reading stream but tasks need to be
|
||||
// completed
|
||||
self.flags.insert(Flags::ERROR);
|
||||
|
||||
if let Some(mut payload) = self.payload.take() {
|
||||
payload.set_error(PayloadError::Incomplete);
|
||||
}
|
||||
self.client_disconnect();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -233,7 +248,10 @@ where
|
||||
let mut idx = 0;
|
||||
while idx < self.tasks.len() {
|
||||
// only one task can do io operation in http/1
|
||||
if !io && !self.tasks[idx].flags.contains(EntryFlags::EOF) {
|
||||
if !io
|
||||
&& !self.tasks[idx].flags.contains(EntryFlags::EOF)
|
||||
&& !self.flags.contains(Flags::WRITE_DISCONNECTED)
|
||||
{
|
||||
// io is corrupted, send buffer
|
||||
if self.tasks[idx].flags.contains(EntryFlags::ERROR) {
|
||||
if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
|
||||
@@ -297,7 +315,6 @@ where
|
||||
}
|
||||
|
||||
// cleanup finished tasks
|
||||
let max = self.tasks.len() >= MAX_PIPELINED_MESSAGES;
|
||||
while !self.tasks.is_empty() {
|
||||
if self.tasks[0]
|
||||
.flags
|
||||
@@ -308,10 +325,6 @@ where
|
||||
break;
|
||||
}
|
||||
}
|
||||
// read more message
|
||||
if max && self.tasks.len() >= MAX_PIPELINED_MESSAGES {
|
||||
return Ok(Async::Ready(true));
|
||||
}
|
||||
|
||||
// check stream state
|
||||
if self.flags.contains(Flags::STARTED) {
|
||||
@@ -331,8 +344,12 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// deal with keep-alive
|
||||
// deal with keep-alive and steam eof (client-side write shutdown)
|
||||
if self.tasks.is_empty() {
|
||||
// handle stream eof
|
||||
if self.flags.contains(Flags::READ_DISCONNECTED) {
|
||||
return Ok(Async::Ready(false));
|
||||
}
|
||||
// no keep-alive
|
||||
if self.flags.contains(Flags::ERROR)
|
||||
|| (!self.flags.contains(Flags::KEEPALIVE)
|
||||
@@ -448,7 +465,14 @@ where
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(None) => break,
|
||||
Ok(None) => {
|
||||
if self.flags.contains(Flags::READ_DISCONNECTED)
|
||||
&& self.tasks.is_empty()
|
||||
{
|
||||
self.client_disconnect();
|
||||
}
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
self.flags.insert(Flags::ERROR);
|
||||
if let Some(mut payload) = self.payload.take() {
|
||||
@@ -603,24 +627,36 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_req_parse() {
|
||||
fn test_req_parse1() {
|
||||
let buf = Buffer::new("GET /test HTTP/1.1\r\n\r\n");
|
||||
let readbuf = BytesMut::new();
|
||||
let settings = Rc::new(wrk_settings());
|
||||
|
||||
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf);
|
||||
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, false);
|
||||
h1.poll_io();
|
||||
h1.poll_io();
|
||||
assert_eq!(h1.tasks.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_req_parse2() {
|
||||
let buf = Buffer::new("");
|
||||
let readbuf =
|
||||
BytesMut::from(Vec::<u8>::from(&b"GET /test HTTP/1.1\r\n\r\n"[..]));
|
||||
let settings = Rc::new(wrk_settings());
|
||||
|
||||
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, true);
|
||||
h1.poll_io();
|
||||
assert_eq!(h1.tasks.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_req_parse_err() {
|
||||
let buf = Buffer::new("GET /test HTTP/1\r\n\r\n");
|
||||
let readbuf = BytesMut::new();
|
||||
let settings = Rc::new(wrk_settings());
|
||||
|
||||
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf);
|
||||
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, false);
|
||||
h1.poll_io();
|
||||
h1.poll_io();
|
||||
assert!(h1.flags.contains(Flags::ERROR));
|
||||
|
@@ -63,7 +63,9 @@ impl<T: AsyncWrite, H: 'static> H1Writer<T, H> {
|
||||
self.flags = Flags::KEEPALIVE;
|
||||
}
|
||||
|
||||
pub fn disconnected(&mut self) {}
|
||||
pub fn disconnected(&mut self) {
|
||||
self.flags.insert(Flags::DISCONNECTED);
|
||||
}
|
||||
|
||||
pub fn keepalive(&self) -> bool {
|
||||
self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE)
|
||||
@@ -268,10 +270,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
|
||||
let pl: &[u8] = payload.as_ref();
|
||||
let n = match Self::write_data(&mut self.stream, pl) {
|
||||
Err(err) => {
|
||||
if err.kind() == io::ErrorKind::WriteZero {
|
||||
self.disconnected();
|
||||
}
|
||||
|
||||
self.disconnected();
|
||||
return Err(err);
|
||||
}
|
||||
Ok(val) => val,
|
||||
@@ -315,14 +314,15 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
|
||||
|
||||
#[inline]
|
||||
fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error> {
|
||||
if self.flags.contains(Flags::DISCONNECTED) {
|
||||
return Err(io::Error::new(io::ErrorKind::Other, "disconnected"));
|
||||
}
|
||||
|
||||
if !self.buffer.is_empty() {
|
||||
let written = {
|
||||
match Self::write_data(&mut self.stream, self.buffer.as_ref().as_ref()) {
|
||||
Err(err) => {
|
||||
if err.kind() == io::ErrorKind::WriteZero {
|
||||
self.disconnected();
|
||||
}
|
||||
|
||||
self.disconnected();
|
||||
return Err(err);
|
||||
}
|
||||
Ok(val) => val,
|
||||
@@ -339,7 +339,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
|
||||
self.stream.poll_flush()?;
|
||||
self.stream.shutdown()
|
||||
} else {
|
||||
self.stream.poll_flush()
|
||||
Ok(self.stream.poll_flush()?)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -390,7 +390,7 @@ pub trait IoStream: AsyncRead + AsyncWrite + 'static {
|
||||
|
||||
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()>;
|
||||
|
||||
fn read_available(&mut self, buf: &mut BytesMut) -> Poll<bool, io::Error> {
|
||||
fn read_available(&mut self, buf: &mut BytesMut) -> Poll<(bool, bool), io::Error> {
|
||||
let mut read_some = false;
|
||||
loop {
|
||||
if buf.remaining_mut() < LW_BUFFER_SIZE {
|
||||
@@ -400,7 +400,7 @@ pub trait IoStream: AsyncRead + AsyncWrite + 'static {
|
||||
match self.read(buf.bytes_mut()) {
|
||||
Ok(n) => {
|
||||
if n == 0 {
|
||||
return Ok(Async::Ready(!read_some));
|
||||
return Ok(Async::Ready((read_some, true)));
|
||||
} else {
|
||||
read_some = true;
|
||||
buf.advance_mut(n);
|
||||
@@ -409,7 +409,7 @@ pub trait IoStream: AsyncRead + AsyncWrite + 'static {
|
||||
Err(e) => {
|
||||
return if e.kind() == io::ErrorKind::WouldBlock {
|
||||
if read_some {
|
||||
Ok(Async::Ready(false))
|
||||
Ok(Async::Ready((read_some, false)))
|
||||
} else {
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
|
@@ -217,7 +217,7 @@ impl Server {
|
||||
// start accept thread
|
||||
for sock in &self.sockets {
|
||||
for s in sock.iter() {
|
||||
info!("Starting server on http://{:?}", s.1.local_addr().ok());
|
||||
info!("Starting server on http://{}", s.1.local_addr().unwrap());
|
||||
}
|
||||
}
|
||||
let rx = self
|
||||
|
@@ -8,7 +8,8 @@ extern crate rand;
|
||||
#[cfg(all(unix, feature = "uds"))]
|
||||
extern crate tokio_uds;
|
||||
|
||||
use std::io::Read;
|
||||
use std::io::{Read, Write};
|
||||
use std::{net, thread};
|
||||
|
||||
use bytes::Bytes;
|
||||
use flate2::read::GzDecoder;
|
||||
@@ -66,6 +67,16 @@ fn test_simple() {
|
||||
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_connection_close() {
|
||||
let mut srv =
|
||||
test::TestServer::new(|app| app.handler(|_| HttpResponse::Ok().body(STR)));
|
||||
|
||||
let request = srv.get().header("Connection", "close").finish().unwrap();
|
||||
let response = srv.execute(request.send()).unwrap();
|
||||
assert!(response.status().is_success());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_with_query_parameter() {
|
||||
let mut srv = test::TestServer::new(|app| {
|
||||
@@ -460,3 +471,33 @@ fn test_default_headers() {
|
||||
"\""
|
||||
)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn client_read_until_eof() {
|
||||
let addr = test::TestServer::unused_addr();
|
||||
|
||||
thread::spawn(move || {
|
||||
let lst = net::TcpListener::bind(addr).unwrap();
|
||||
|
||||
for stream in lst.incoming() {
|
||||
let mut stream = stream.unwrap();
|
||||
let mut b = [0; 1000];
|
||||
let _ = stream.read(&mut b).unwrap();
|
||||
let _ = stream
|
||||
.write_all(b"HTTP/1.1 200 OK\r\nconnection: close\r\n\r\nwelcome!");
|
||||
}
|
||||
});
|
||||
|
||||
let mut sys = actix::System::new("test");
|
||||
|
||||
// client request
|
||||
let req = client::ClientRequest::get(format!("http://{}/", addr).as_str())
|
||||
.finish()
|
||||
.unwrap();
|
||||
let response = sys.block_on(req.send()).unwrap();
|
||||
assert!(response.status().is_success());
|
||||
|
||||
// read response
|
||||
let bytes = sys.block_on(response.body()).unwrap();
|
||||
assert_eq!(bytes, Bytes::from_static(b"welcome!"));
|
||||
}
|
||||
|
@@ -932,6 +932,28 @@ fn test_application() {
|
||||
assert!(response.status().is_success());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_default_404_handler_response() {
|
||||
let mut srv = test::TestServer::with_factory(|| {
|
||||
App::new()
|
||||
.prefix("/app")
|
||||
.resource("", |r| r.f(|_| HttpResponse::Ok()))
|
||||
.resource("/", |r| r.f(|_| HttpResponse::Ok()))
|
||||
});
|
||||
let addr = srv.addr();
|
||||
|
||||
let mut buf = [0; 24];
|
||||
let request = TcpStream::connect(&addr)
|
||||
.and_then(|sock| {
|
||||
tokio::io::write_all(sock, "HEAD / HTTP/1.1\r\nHost: localhost\r\n\r\n")
|
||||
.and_then(|(sock, _)| tokio::io::read_exact(sock, &mut buf))
|
||||
.and_then(|(_, buf)| Ok(buf))
|
||||
}).map_err(|e| panic!("{:?}", e));
|
||||
let response = srv.execute(request).unwrap();
|
||||
let rep = String::from_utf8_lossy(&response[..]);
|
||||
assert!(rep.contains("HTTP/1.1 404 Not Found"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_server_cookies() {
|
||||
use actix_web::http;
|
||||
|
Reference in New Issue
Block a user