mirror of
https://github.com/fafhrd91/actix-web
synced 2025-07-28 03:15:11 +02:00
Compare commits
29 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
1fe4315c94 | ||
|
381b90e9a1 | ||
|
2d18dba40a | ||
|
d2693d58a8 | ||
|
84bf282c17 | ||
|
b15b5e5246 | ||
|
52b3b0c362 | ||
|
64c4cefa8f | ||
|
7e8b231f57 | ||
|
8a344d0c94 | ||
|
4096089a3f | ||
|
b16f2d5f05 | ||
|
5baf15822a | ||
|
5368ce823e | ||
|
4effdf065b | ||
|
61970ab190 | ||
|
484b00a0f9 | ||
|
73bf2068aa | ||
|
1cda949204 | ||
|
ad6b823255 | ||
|
0f064db31d | ||
|
fd0bb54469 | ||
|
e27bbaa55c | ||
|
8a50eae1e2 | ||
|
38080f67b3 | ||
|
08504e0892 | ||
|
401c0ad809 | ||
|
b4b0deb7fa | ||
|
05ff35d383 |
17
CHANGES.md
17
CHANGES.md
@@ -1,5 +1,22 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## 0.4.9 (2018-03-16)
|
||||||
|
|
||||||
|
* Allow to disable http/2 support
|
||||||
|
|
||||||
|
* Wake payload reading task when data is available
|
||||||
|
|
||||||
|
* Fix server keep-alive handling
|
||||||
|
|
||||||
|
* Send Query Parameters in client requests #120
|
||||||
|
|
||||||
|
* Move brotli encoding to a feature
|
||||||
|
|
||||||
|
* Add option of default handler for `StaticFiles` handler #57
|
||||||
|
|
||||||
|
* Add basic client connection pooling
|
||||||
|
|
||||||
|
|
||||||
## 0.4.8 (2018-03-12)
|
## 0.4.8 (2018-03-12)
|
||||||
|
|
||||||
* Allow to set read buffer capacity for server request
|
* Allow to set read buffer capacity for server request
|
||||||
|
11
Cargo.toml
11
Cargo.toml
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-web"
|
name = "actix-web"
|
||||||
version = "0.4.8"
|
version = "0.4.9"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Actix web is a simple, pragmatic, extremely fast, web framework for Rust."
|
description = "Actix web is a simple, pragmatic, extremely fast, web framework for Rust."
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
@@ -27,7 +27,7 @@ name = "actix_web"
|
|||||||
path = "src/lib.rs"
|
path = "src/lib.rs"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["session"]
|
default = ["session", "brotli"]
|
||||||
|
|
||||||
# tls
|
# tls
|
||||||
tls = ["native-tls", "tokio-tls"]
|
tls = ["native-tls", "tokio-tls"]
|
||||||
@@ -38,12 +38,14 @@ alpn = ["openssl", "openssl/v102", "openssl/v110", "tokio-openssl"]
|
|||||||
# sessions
|
# sessions
|
||||||
session = ["cookie/secure"]
|
session = ["cookie/secure"]
|
||||||
|
|
||||||
|
# brotli encoding
|
||||||
|
brotli = ["brotli2"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix = "^0.5.2"
|
actix = "^0.5.4"
|
||||||
|
|
||||||
base64 = "0.9"
|
base64 = "0.9"
|
||||||
bitflags = "1.0"
|
bitflags = "1.0"
|
||||||
brotli2 = "^0.3.2"
|
|
||||||
failure = "0.1.1"
|
failure = "0.1.1"
|
||||||
flate2 = "1.0"
|
flate2 = "1.0"
|
||||||
h2 = "0.1"
|
h2 = "0.1"
|
||||||
@@ -67,6 +69,7 @@ encoding = "0.2"
|
|||||||
language-tags = "0.2"
|
language-tags = "0.2"
|
||||||
url = { version="1.7", features=["query_encoding"] }
|
url = { version="1.7", features=["query_encoding"] }
|
||||||
cookie = { version="0.10", features=["percent-encode"] }
|
cookie = { version="0.10", features=["percent-encode"] }
|
||||||
|
brotli2 = { version="^0.3.2", optional = true }
|
||||||
|
|
||||||
# io
|
# io
|
||||||
mio = "^0.6.13"
|
mio = "^0.6.13"
|
||||||
|
@@ -1,4 +1,4 @@
|
|||||||
# websockect
|
# websocket
|
||||||
|
|
||||||
Simple echo websocket server.
|
Simple echo websocket server.
|
||||||
|
|
||||||
|
@@ -1,15 +1,18 @@
|
|||||||
use std::{io, time};
|
use std::{fmt, io, time};
|
||||||
|
use std::cell::RefCell;
|
||||||
|
use std::rc::Rc;
|
||||||
use std::net::Shutdown;
|
use std::net::Shutdown;
|
||||||
use std::time::Duration;
|
use std::time::{Duration, Instant};
|
||||||
|
use std::collections::{HashMap, VecDeque};
|
||||||
|
|
||||||
use actix::{fut, Actor, ActorFuture, Context,
|
use actix::{fut, Actor, ActorFuture, Context, AsyncContext,
|
||||||
Handler, Message, ActorResponse, Supervised};
|
Handler, Message, ActorResponse, Supervised};
|
||||||
use actix::registry::ArbiterService;
|
use actix::registry::ArbiterService;
|
||||||
use actix::fut::WrapFuture;
|
use actix::fut::WrapFuture;
|
||||||
use actix::actors::{Connector, ConnectorError, Connect as ResolveConnect};
|
use actix::actors::{Connector, ConnectorError, Connect as ResolveConnect};
|
||||||
|
|
||||||
use http::{Uri, HttpTryFrom, Error as HttpError};
|
use http::{Uri, HttpTryFrom, Error as HttpError};
|
||||||
use futures::Poll;
|
use futures::{Async, Poll};
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
|
|
||||||
#[cfg(feature="alpn")]
|
#[cfg(feature="alpn")]
|
||||||
@@ -104,10 +107,15 @@ pub struct ClientConnector {
|
|||||||
connector: SslConnector,
|
connector: SslConnector,
|
||||||
#[cfg(all(feature="tls", not(feature="alpn")))]
|
#[cfg(all(feature="tls", not(feature="alpn")))]
|
||||||
connector: TlsConnector,
|
connector: TlsConnector,
|
||||||
|
pool: Rc<Pool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Actor for ClientConnector {
|
impl Actor for ClientConnector {
|
||||||
type Context = Context<ClientConnector>;
|
type Context = Context<ClientConnector>;
|
||||||
|
|
||||||
|
fn started(&mut self, ctx: &mut Self::Context) {
|
||||||
|
self.collect(ctx);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Supervised for ClientConnector {}
|
impl Supervised for ClientConnector {}
|
||||||
@@ -120,19 +128,21 @@ impl Default for ClientConnector {
|
|||||||
{
|
{
|
||||||
let builder = SslConnector::builder(SslMethod::tls()).unwrap();
|
let builder = SslConnector::builder(SslMethod::tls()).unwrap();
|
||||||
ClientConnector {
|
ClientConnector {
|
||||||
connector: builder.build()
|
connector: builder.build(),
|
||||||
|
pool: Rc::new(Pool::new()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#[cfg(all(feature="tls", not(feature="alpn")))]
|
#[cfg(all(feature="tls", not(feature="alpn")))]
|
||||||
{
|
{
|
||||||
let builder = TlsConnector::builder().unwrap();
|
let builder = TlsConnector::builder().unwrap();
|
||||||
ClientConnector {
|
ClientConnector {
|
||||||
connector: builder.build().unwrap()
|
connector: builder.build().unwrap(),
|
||||||
|
pool: Rc::new(Pool::new()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(any(feature="alpn", feature="tls")))]
|
#[cfg(not(any(feature="alpn", feature="tls")))]
|
||||||
ClientConnector {}
|
ClientConnector {pool: Rc::new(Pool::new())}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -182,7 +192,12 @@ impl ClientConnector {
|
|||||||
/// }
|
/// }
|
||||||
/// ```
|
/// ```
|
||||||
pub fn with_connector(connector: SslConnector) -> ClientConnector {
|
pub fn with_connector(connector: SslConnector) -> ClientConnector {
|
||||||
ClientConnector { connector }
|
ClientConnector { connector, pool: Rc::new(Pool::new()) }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn collect(&mut self, ctx: &mut Context<Self>) {
|
||||||
|
self.pool.collect();
|
||||||
|
ctx.run_later(Duration::from_secs(1), |act, ctx| act.collect(ctx));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -214,10 +229,21 @@ impl Handler<Connect> for ClientConnector {
|
|||||||
|
|
||||||
let host = uri.host().unwrap().to_owned();
|
let host = uri.host().unwrap().to_owned();
|
||||||
let port = uri.port().unwrap_or_else(|| proto.port());
|
let port = uri.port().unwrap_or_else(|| proto.port());
|
||||||
|
let key = Key {host, port, ssl: proto.is_secure()};
|
||||||
|
|
||||||
|
let pool = if proto.is_http() {
|
||||||
|
if let Some(conn) = self.pool.query(&key) {
|
||||||
|
return ActorResponse::async(fut::ok(conn))
|
||||||
|
} else {
|
||||||
|
Some(Rc::clone(&self.pool))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
ActorResponse::async(
|
ActorResponse::async(
|
||||||
Connector::from_registry()
|
Connector::from_registry()
|
||||||
.send(ResolveConnect::host_and_port(&host, port)
|
.send(ResolveConnect::host_and_port(&key.host, port)
|
||||||
.timeout(conn_timeout))
|
.timeout(conn_timeout))
|
||||||
.into_actor(self)
|
.into_actor(self)
|
||||||
.map_err(|_, _, _| ClientConnectorError::Disconnected)
|
.map_err(|_, _, _| ClientConnectorError::Disconnected)
|
||||||
@@ -228,12 +254,14 @@ impl Handler<Connect> for ClientConnector {
|
|||||||
Ok(stream) => {
|
Ok(stream) => {
|
||||||
if proto.is_secure() {
|
if proto.is_secure() {
|
||||||
fut::Either::A(
|
fut::Either::A(
|
||||||
_act.connector.connect_async(&host, stream)
|
_act.connector.connect_async(&key.host, stream)
|
||||||
.map_err(ClientConnectorError::SslError)
|
.map_err(ClientConnectorError::SslError)
|
||||||
.map(|stream| Connection{stream: Box::new(stream)})
|
.map(|stream| Connection::new(
|
||||||
|
key, pool, Box::new(stream)))
|
||||||
.into_actor(_act))
|
.into_actor(_act))
|
||||||
} else {
|
} else {
|
||||||
fut::Either::B(fut::ok(Connection{stream: Box::new(stream)}))
|
fut::Either::B(fut::ok(
|
||||||
|
Connection::new(key, pool, Box::new(stream))))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -244,12 +272,14 @@ impl Handler<Connect> for ClientConnector {
|
|||||||
Ok(stream) => {
|
Ok(stream) => {
|
||||||
if proto.is_secure() {
|
if proto.is_secure() {
|
||||||
fut::Either::A(
|
fut::Either::A(
|
||||||
_act.connector.connect_async(&host, stream)
|
_act.connector.connect_async(&key.host, stream)
|
||||||
.map_err(ClientConnectorError::SslError)
|
.map_err(ClientConnectorError::SslError)
|
||||||
.map(|stream| Connection{stream: Box::new(stream)})
|
.map(|stream| Connection::new(
|
||||||
|
key, pool, Box::new(stream)))
|
||||||
.into_actor(_act))
|
.into_actor(_act))
|
||||||
} else {
|
} else {
|
||||||
fut::Either::B(fut::ok(Connection{stream: Box::new(stream)}))
|
fut::Either::B(fut::ok(
|
||||||
|
Connection::new(key, pool, Box::new(stream))))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -261,7 +291,7 @@ impl Handler<Connect> for ClientConnector {
|
|||||||
if proto.is_secure() {
|
if proto.is_secure() {
|
||||||
fut::err(ClientConnectorError::SslIsNotSupported)
|
fut::err(ClientConnectorError::SslIsNotSupported)
|
||||||
} else {
|
} else {
|
||||||
fut::ok(Connection{stream: Box::new(stream)})
|
fut::ok(Connection::new(key, pool, Box::new(stream)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -288,6 +318,13 @@ impl Protocol {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn is_http(&self) -> bool {
|
||||||
|
match *self {
|
||||||
|
Protocol::Https | Protocol::Http => true,
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn is_secure(&self) -> bool {
|
fn is_secure(&self) -> bool {
|
||||||
match *self {
|
match *self {
|
||||||
Protocol::Https | Protocol::Wss => true,
|
Protocol::Https | Protocol::Wss => true,
|
||||||
@@ -303,18 +340,156 @@ impl Protocol {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
|
||||||
|
struct Key {
|
||||||
|
host: String,
|
||||||
|
port: u16,
|
||||||
|
ssl: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Key {
|
||||||
|
fn empty() -> Key {
|
||||||
|
Key{host: String::new(), port: 0, ssl: false}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct Conn(Instant, Connection);
|
||||||
|
|
||||||
|
pub struct Pool {
|
||||||
|
max_size: usize,
|
||||||
|
keep_alive: Duration,
|
||||||
|
max_lifetime: Duration,
|
||||||
|
pool: RefCell<HashMap<Key, VecDeque<Conn>>>,
|
||||||
|
to_close: RefCell<Vec<Connection>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Pool {
|
||||||
|
fn new() -> Pool {
|
||||||
|
Pool {
|
||||||
|
max_size: 128,
|
||||||
|
keep_alive: Duration::from_secs(15),
|
||||||
|
max_lifetime: Duration::from_secs(75),
|
||||||
|
pool: RefCell::new(HashMap::new()),
|
||||||
|
to_close: RefCell::new(Vec::new()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn collect(&self) {
|
||||||
|
let mut pool = self.pool.borrow_mut();
|
||||||
|
let mut to_close = self.to_close.borrow_mut();
|
||||||
|
|
||||||
|
// check keep-alive
|
||||||
|
let now = Instant::now();
|
||||||
|
for conns in pool.values_mut() {
|
||||||
|
while !conns.is_empty() {
|
||||||
|
if (now - conns[0].0) > self.keep_alive
|
||||||
|
|| (now - conns[0].1.ts) > self.max_lifetime
|
||||||
|
{
|
||||||
|
let conn = conns.pop_front().unwrap().1;
|
||||||
|
to_close.push(conn);
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// check connections for shutdown
|
||||||
|
let mut idx = 0;
|
||||||
|
while idx < to_close.len() {
|
||||||
|
match AsyncWrite::shutdown(&mut to_close[idx]) {
|
||||||
|
Ok(Async::NotReady) => idx += 1,
|
||||||
|
_ => {
|
||||||
|
to_close.swap_remove(idx);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn query(&self, key: &Key) -> Option<Connection> {
|
||||||
|
let mut pool = self.pool.borrow_mut();
|
||||||
|
let mut to_close = self.to_close.borrow_mut();
|
||||||
|
|
||||||
|
if let Some(ref mut connections) = pool.get_mut(key) {
|
||||||
|
let now = Instant::now();
|
||||||
|
while let Some(conn) = connections.pop_back() {
|
||||||
|
// check if it still usable
|
||||||
|
if (now - conn.0) > self.keep_alive
|
||||||
|
|| (now - conn.1.ts) > self.max_lifetime
|
||||||
|
{
|
||||||
|
to_close.push(conn.1);
|
||||||
|
} else {
|
||||||
|
let mut conn = conn.1;
|
||||||
|
let mut buf = [0; 2];
|
||||||
|
match conn.stream().read(&mut buf) {
|
||||||
|
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => (),
|
||||||
|
Ok(n) if n > 0 => {
|
||||||
|
to_close.push(conn);
|
||||||
|
continue
|
||||||
|
},
|
||||||
|
Ok(_) | Err(_) => continue,
|
||||||
|
}
|
||||||
|
return Some(conn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
fn release(&self, conn: Connection) {
|
||||||
|
if (Instant::now() - conn.ts) < self.max_lifetime {
|
||||||
|
let mut pool = self.pool.borrow_mut();
|
||||||
|
if !pool.contains_key(&conn.key) {
|
||||||
|
let key = conn.key.clone();
|
||||||
|
let mut vec = VecDeque::new();
|
||||||
|
vec.push_back(Conn(Instant::now(), conn));
|
||||||
|
pool.insert(key, vec);
|
||||||
|
} else {
|
||||||
|
let vec = pool.get_mut(&conn.key).unwrap();
|
||||||
|
vec.push_back(Conn(Instant::now(), conn));
|
||||||
|
if vec.len() > self.max_size {
|
||||||
|
let conn = vec.pop_front().unwrap();
|
||||||
|
self.to_close.borrow_mut().push(conn.1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
pub struct Connection {
|
pub struct Connection {
|
||||||
|
key: Key,
|
||||||
stream: Box<IoStream>,
|
stream: Box<IoStream>,
|
||||||
|
pool: Option<Rc<Pool>>,
|
||||||
|
ts: Instant,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for Connection {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
write!(f, "Connection {}:{}", self.key.host, self.key.port)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Connection {
|
impl Connection {
|
||||||
|
fn new(key: Key, pool: Option<Rc<Pool>>, stream: Box<IoStream>) -> Self {
|
||||||
|
Connection {
|
||||||
|
key, pool, stream,
|
||||||
|
ts: Instant::now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn stream(&mut self) -> &mut IoStream {
|
pub fn stream(&mut self) -> &mut IoStream {
|
||||||
&mut *self.stream
|
&mut *self.stream
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn from_stream<T: IoStream>(io: T) -> Connection {
|
pub fn from_stream<T: IoStream>(io: T) -> Connection {
|
||||||
Connection{stream: Box::new(io)}
|
Connection::new(Key::empty(), None, Box::new(io))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn release(mut self) {
|
||||||
|
if let Some(pool) = self.pool.take() {
|
||||||
|
pool.release(self)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -171,7 +171,8 @@ impl Future for SendRequest {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let pl = Box::new(Pipeline {
|
let pl = Box::new(Pipeline {
|
||||||
body, conn, writer,
|
body, writer,
|
||||||
|
conn: Some(conn),
|
||||||
parser: Some(HttpResponseParser::default()),
|
parser: Some(HttpResponseParser::default()),
|
||||||
parser_buf: BytesMut::new(),
|
parser_buf: BytesMut::new(),
|
||||||
disconnected: false,
|
disconnected: false,
|
||||||
@@ -208,7 +209,7 @@ impl Future for SendRequest {
|
|||||||
|
|
||||||
pub(crate) struct Pipeline {
|
pub(crate) struct Pipeline {
|
||||||
body: IoBody,
|
body: IoBody,
|
||||||
conn: Connection,
|
conn: Option<Connection>,
|
||||||
writer: HttpClientWriter,
|
writer: HttpClientWriter,
|
||||||
parser: Option<HttpResponseParser>,
|
parser: Option<HttpResponseParser>,
|
||||||
parser_buf: BytesMut,
|
parser_buf: BytesMut,
|
||||||
@@ -249,9 +250,16 @@ impl RunningState {
|
|||||||
|
|
||||||
impl Pipeline {
|
impl Pipeline {
|
||||||
|
|
||||||
|
fn release_conn(&mut self) {
|
||||||
|
if let Some(conn) = self.conn.take() {
|
||||||
|
conn.release()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn parse(&mut self) -> Poll<ClientResponse, HttpResponseParserError> {
|
fn parse(&mut self) -> Poll<ClientResponse, HttpResponseParserError> {
|
||||||
match self.parser.as_mut().unwrap().parse(&mut self.conn, &mut self.parser_buf) {
|
if let Some(ref mut conn) = self.conn {
|
||||||
|
match self.parser.as_mut().unwrap().parse(conn, &mut self.parser_buf) {
|
||||||
Ok(Async::Ready(resp)) => {
|
Ok(Async::Ready(resp)) => {
|
||||||
// check content-encoding
|
// check content-encoding
|
||||||
if self.should_decompress {
|
if self.should_decompress {
|
||||||
@@ -269,10 +277,18 @@ impl Pipeline {
|
|||||||
}
|
}
|
||||||
val => val,
|
val => val,
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
Ok(Async::NotReady)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn poll(&mut self) -> Poll<Option<Bytes>, PayloadError> {
|
pub fn poll(&mut self) -> Poll<Option<Bytes>, PayloadError> {
|
||||||
|
if self.conn.is_none() {
|
||||||
|
return Ok(Async::Ready(None))
|
||||||
|
}
|
||||||
|
let conn: &mut Connection = unsafe{ mem::transmute(self.conn.as_mut().unwrap())};
|
||||||
|
|
||||||
let mut need_run = false;
|
let mut need_run = false;
|
||||||
|
|
||||||
// need write?
|
// need write?
|
||||||
@@ -286,7 +302,7 @@ impl Pipeline {
|
|||||||
if self.parser.is_some() {
|
if self.parser.is_some() {
|
||||||
loop {
|
loop {
|
||||||
match self.parser.as_mut().unwrap()
|
match self.parser.as_mut().unwrap()
|
||||||
.parse_payload(&mut self.conn, &mut self.parser_buf)?
|
.parse_payload(conn, &mut self.parser_buf)?
|
||||||
{
|
{
|
||||||
Async::Ready(Some(b)) => {
|
Async::Ready(Some(b)) => {
|
||||||
if let Some(ref mut decompress) = self.decompress {
|
if let Some(ref mut decompress) = self.decompress {
|
||||||
@@ -314,6 +330,7 @@ impl Pipeline {
|
|||||||
if let Some(mut decompress) = self.decompress.take() {
|
if let Some(mut decompress) = self.decompress.take() {
|
||||||
let res = decompress.feed_eof();
|
let res = decompress.feed_eof();
|
||||||
if let Some(b) = res? {
|
if let Some(b) = res? {
|
||||||
|
self.release_conn();
|
||||||
return Ok(Async::Ready(Some(b)))
|
return Ok(Async::Ready(Some(b)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -321,13 +338,14 @@ impl Pipeline {
|
|||||||
if need_run {
|
if need_run {
|
||||||
Ok(Async::NotReady)
|
Ok(Async::NotReady)
|
||||||
} else {
|
} else {
|
||||||
|
self.release_conn();
|
||||||
Ok(Async::Ready(None))
|
Ok(Async::Ready(None))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn poll_write(&mut self) -> Poll<(), Error> {
|
fn poll_write(&mut self) -> Poll<(), Error> {
|
||||||
if self.write_state == RunningState::Done {
|
if self.write_state == RunningState::Done || self.conn.is_none() {
|
||||||
return Ok(Async::Ready(()))
|
return Ok(Async::Ready(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -416,7 +434,7 @@ impl Pipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// flush io but only if we need to
|
// flush io but only if we need to
|
||||||
match self.writer.poll_completed(&mut self.conn, false) {
|
match self.writer.poll_completed(self.conn.as_mut().unwrap(), false) {
|
||||||
Ok(Async::Ready(_)) => {
|
Ok(Async::Ready(_)) => {
|
||||||
if self.disconnected {
|
if self.disconnected {
|
||||||
self.write_state = RunningState::Done;
|
self.write_state = RunningState::Done;
|
||||||
|
@@ -599,3 +599,19 @@ fn parts<'a>(parts: &'a mut Option<ClientRequest>, err: &Option<HttpError>)
|
|||||||
}
|
}
|
||||||
parts.as_mut()
|
parts.as_mut()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for ClientRequestBuilder {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
if let Some(ref parts) = self.request {
|
||||||
|
let res = write!(f, "\nClientRequestBuilder {:?} {}:{}\n",
|
||||||
|
parts.version, parts.method, parts.uri);
|
||||||
|
let _ = write!(f, " headers:\n");
|
||||||
|
for (key, val) in parts.headers.iter() {
|
||||||
|
let _ = write!(f, " {:?}: {:?}\n", key, val);
|
||||||
|
}
|
||||||
|
res
|
||||||
|
} else {
|
||||||
|
write!(f, "ClientRequestBuilder(Consumed)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@@ -13,10 +13,11 @@ use http::header::{HeaderValue, DATE,
|
|||||||
CONNECTION, CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING};
|
CONNECTION, CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING};
|
||||||
use flate2::Compression;
|
use flate2::Compression;
|
||||||
use flate2::write::{GzEncoder, DeflateEncoder};
|
use flate2::write::{GzEncoder, DeflateEncoder};
|
||||||
|
#[cfg(feature="brotli")]
|
||||||
use brotli2::write::BrotliEncoder;
|
use brotli2::write::BrotliEncoder;
|
||||||
|
|
||||||
use body::{Body, Binary};
|
use body::{Body, Binary};
|
||||||
use headers::ContentEncoding;
|
use header::ContentEncoding;
|
||||||
use server::WriterState;
|
use server::WriterState;
|
||||||
use server::shared::SharedBytes;
|
use server::shared::SharedBytes;
|
||||||
use server::encoding::{ContentEncoder, TransferEncoding};
|
use server::encoding::{ContentEncoder, TransferEncoding};
|
||||||
@@ -112,7 +113,9 @@ impl HttpClientWriter {
|
|||||||
|
|
||||||
// status line
|
// status line
|
||||||
let _ = write!(buffer, "{} {} {:?}\r\n",
|
let _ = write!(buffer, "{} {} {:?}\r\n",
|
||||||
msg.method(), msg.uri().path(), msg.version());
|
msg.method(),
|
||||||
|
msg.uri().path_and_query().map(|u| u.as_str()).unwrap_or("/"),
|
||||||
|
msg.version());
|
||||||
|
|
||||||
// write headers
|
// write headers
|
||||||
for (key, value) in msg.headers() {
|
for (key, value) in msg.headers() {
|
||||||
@@ -213,6 +216,7 @@ fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> ContentEncoder
|
|||||||
DeflateEncoder::new(transfer, Compression::default())),
|
DeflateEncoder::new(transfer, Compression::default())),
|
||||||
ContentEncoding::Gzip => ContentEncoder::Gzip(
|
ContentEncoding::Gzip => ContentEncoder::Gzip(
|
||||||
GzEncoder::new(transfer, Compression::default())),
|
GzEncoder::new(transfer, Compression::default())),
|
||||||
|
#[cfg(feature="brotli")]
|
||||||
ContentEncoding::Br => ContentEncoder::Br(
|
ContentEncoding::Br => ContentEncoder::Br(
|
||||||
BrotliEncoder::new(transfer, 5)),
|
BrotliEncoder::new(transfer, 5)),
|
||||||
ContentEncoding::Identity => ContentEncoder::Identity(transfer),
|
ContentEncoding::Identity => ContentEncoder::Identity(transfer),
|
||||||
@@ -262,6 +266,7 @@ fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> ContentEncoder
|
|||||||
DeflateEncoder::new(transfer, Compression::default())),
|
DeflateEncoder::new(transfer, Compression::default())),
|
||||||
ContentEncoding::Gzip => ContentEncoder::Gzip(
|
ContentEncoding::Gzip => ContentEncoder::Gzip(
|
||||||
GzEncoder::new(transfer, Compression::default())),
|
GzEncoder::new(transfer, Compression::default())),
|
||||||
|
#[cfg(feature="brotli")]
|
||||||
ContentEncoding::Br => ContentEncoder::Br(
|
ContentEncoding::Br => ContentEncoder::Br(
|
||||||
BrotliEncoder::new(transfer, 5)),
|
BrotliEncoder::new(transfer, 5)),
|
||||||
ContentEncoding::Identity | ContentEncoding::Auto => ContentEncoder::Identity(transfer),
|
ContentEncoding::Identity | ContentEncoding::Auto => ContentEncoder::Identity(transfer),
|
||||||
|
116
src/fs.rs
116
src/fs.rs
@@ -21,11 +21,11 @@ use mime_guess::get_mime_type;
|
|||||||
use header;
|
use header;
|
||||||
use error::Error;
|
use error::Error;
|
||||||
use param::FromParam;
|
use param::FromParam;
|
||||||
use handler::{Handler, Responder};
|
use handler::{Handler, RouteHandler, WrapHandler, Responder, Reply};
|
||||||
use httpmessage::HttpMessage;
|
use httpmessage::HttpMessage;
|
||||||
use httprequest::HttpRequest;
|
use httprequest::HttpRequest;
|
||||||
use httpresponse::HttpResponse;
|
use httpresponse::HttpResponse;
|
||||||
use httpcodes::{HttpOk, HttpFound, HttpMethodNotAllowed};
|
use httpcodes::{HttpOk, HttpFound, HttpNotFound, HttpMethodNotAllowed};
|
||||||
|
|
||||||
/// A file with an associated name; responds with the Content-Type based on the
|
/// A file with an associated name; responds with the Content-Type based on the
|
||||||
/// file extension.
|
/// file extension.
|
||||||
@@ -36,6 +36,7 @@ pub struct NamedFile {
|
|||||||
md: Metadata,
|
md: Metadata,
|
||||||
modified: Option<SystemTime>,
|
modified: Option<SystemTime>,
|
||||||
cpu_pool: Option<CpuPool>,
|
cpu_pool: Option<CpuPool>,
|
||||||
|
only_get: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NamedFile {
|
impl NamedFile {
|
||||||
@@ -54,7 +55,14 @@ impl NamedFile {
|
|||||||
let path = path.as_ref().to_path_buf();
|
let path = path.as_ref().to_path_buf();
|
||||||
let modified = md.modified().ok();
|
let modified = md.modified().ok();
|
||||||
let cpu_pool = None;
|
let cpu_pool = None;
|
||||||
Ok(NamedFile{path, file, md, modified, cpu_pool})
|
Ok(NamedFile{path, file, md, modified, cpu_pool, only_get: false})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Allow only GET and HEAD methods
|
||||||
|
#[inline]
|
||||||
|
pub fn only_get(mut self) -> Self {
|
||||||
|
self.only_get = true;
|
||||||
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns reference to the underlying `File` object.
|
/// Returns reference to the underlying `File` object.
|
||||||
@@ -168,7 +176,7 @@ impl Responder for NamedFile {
|
|||||||
type Error = io::Error;
|
type Error = io::Error;
|
||||||
|
|
||||||
fn respond_to(self, req: HttpRequest) -> Result<HttpResponse, io::Error> {
|
fn respond_to(self, req: HttpRequest) -> Result<HttpResponse, io::Error> {
|
||||||
if *req.method() != Method::GET && *req.method() != Method::HEAD {
|
if self.only_get && *req.method() != Method::GET && *req.method() != Method::HEAD {
|
||||||
return Ok(HttpMethodNotAllowed.build()
|
return Ok(HttpMethodNotAllowed.build()
|
||||||
.header(header::http::CONTENT_TYPE, "text/plain")
|
.header(header::http::CONTENT_TYPE, "text/plain")
|
||||||
.header(header::http::ALLOW, "GET, HEAD")
|
.header(header::http::ALLOW, "GET, HEAD")
|
||||||
@@ -215,7 +223,9 @@ impl Responder for NamedFile {
|
|||||||
return Ok(resp.status(StatusCode::NOT_MODIFIED).finish().unwrap())
|
return Ok(resp.status(StatusCode::NOT_MODIFIED).finish().unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
if *req.method() == Method::GET {
|
if *req.method() == Method::HEAD {
|
||||||
|
Ok(resp.finish().unwrap())
|
||||||
|
} else {
|
||||||
let reader = ChunkedReadFile {
|
let reader = ChunkedReadFile {
|
||||||
size: self.md.len(),
|
size: self.md.len(),
|
||||||
offset: 0,
|
offset: 0,
|
||||||
@@ -224,8 +234,6 @@ impl Responder for NamedFile {
|
|||||||
fut: None,
|
fut: None,
|
||||||
};
|
};
|
||||||
Ok(resp.streaming(reader).unwrap())
|
Ok(resp.streaming(reader).unwrap())
|
||||||
} else {
|
|
||||||
Ok(resp.finish().unwrap())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -354,27 +362,6 @@ impl Responder for Directory {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This enum represents all filesystem elements.
|
|
||||||
pub enum FilesystemElement {
|
|
||||||
File(NamedFile),
|
|
||||||
Directory(Directory),
|
|
||||||
Redirect(HttpResponse),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Responder for FilesystemElement {
|
|
||||||
type Item = HttpResponse;
|
|
||||||
type Error = io::Error;
|
|
||||||
|
|
||||||
fn respond_to(self, req: HttpRequest) -> Result<HttpResponse, io::Error> {
|
|
||||||
match self {
|
|
||||||
FilesystemElement::File(file) => file.respond_to(req),
|
|
||||||
FilesystemElement::Directory(dir) => dir.respond_to(req),
|
|
||||||
FilesystemElement::Redirect(resp) => Ok(resp),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/// Static files handling
|
/// Static files handling
|
||||||
///
|
///
|
||||||
/// `StaticFile` handler must be registered with `Application::handler()` method,
|
/// `StaticFile` handler must be registered with `Application::handler()` method,
|
||||||
@@ -390,23 +377,24 @@ impl Responder for FilesystemElement {
|
|||||||
/// .finish();
|
/// .finish();
|
||||||
/// }
|
/// }
|
||||||
/// ```
|
/// ```
|
||||||
pub struct StaticFiles {
|
pub struct StaticFiles<S> {
|
||||||
directory: PathBuf,
|
directory: PathBuf,
|
||||||
accessible: bool,
|
accessible: bool,
|
||||||
index: Option<String>,
|
index: Option<String>,
|
||||||
show_index: bool,
|
show_index: bool,
|
||||||
cpu_pool: CpuPool,
|
cpu_pool: CpuPool,
|
||||||
|
default: Box<RouteHandler<S>>,
|
||||||
_chunk_size: usize,
|
_chunk_size: usize,
|
||||||
_follow_symlinks: bool,
|
_follow_symlinks: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StaticFiles {
|
impl<S: 'static> StaticFiles<S> {
|
||||||
/// Create new `StaticFiles` instance
|
/// Create new `StaticFiles` instance
|
||||||
///
|
///
|
||||||
/// `dir` - base directory
|
/// `dir` - base directory
|
||||||
///
|
///
|
||||||
/// `index` - show index for directory
|
/// `index` - show index for directory
|
||||||
pub fn new<T: Into<PathBuf>>(dir: T, index: bool) -> StaticFiles {
|
pub fn new<T: Into<PathBuf>>(dir: T, index: bool) -> StaticFiles<S> {
|
||||||
let dir = dir.into();
|
let dir = dir.into();
|
||||||
|
|
||||||
let (dir, access) = match dir.canonicalize() {
|
let (dir, access) = match dir.canonicalize() {
|
||||||
@@ -430,6 +418,7 @@ impl StaticFiles {
|
|||||||
index: None,
|
index: None,
|
||||||
show_index: index,
|
show_index: index,
|
||||||
cpu_pool: CpuPool::new(40),
|
cpu_pool: CpuPool::new(40),
|
||||||
|
default: Box::new(WrapHandler::new(|_| HttpNotFound)),
|
||||||
_chunk_size: 0,
|
_chunk_size: 0,
|
||||||
_follow_symlinks: false,
|
_follow_symlinks: false,
|
||||||
}
|
}
|
||||||
@@ -439,28 +428,30 @@ impl StaticFiles {
|
|||||||
///
|
///
|
||||||
/// Redirects to specific index file for directory "/" instead of
|
/// Redirects to specific index file for directory "/" instead of
|
||||||
/// showing files listing.
|
/// showing files listing.
|
||||||
pub fn index_file<T: Into<String>>(mut self, index: T) -> StaticFiles {
|
pub fn index_file<T: Into<String>>(mut self, index: T) -> StaticFiles<S> {
|
||||||
self.index = Some(index.into());
|
self.index = Some(index.into());
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Sets default resource which is used when no matched file could be found.
|
||||||
|
pub fn default_handler<H: Handler<S>>(mut self, handler: H) -> StaticFiles<S> {
|
||||||
|
self.default = Box::new(WrapHandler::new(handler));
|
||||||
|
self
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S> Handler<S> for StaticFiles {
|
impl<S: 'static> Handler<S> for StaticFiles<S> {
|
||||||
type Result = Result<FilesystemElement, io::Error>;
|
type Result = Result<Reply, Error>;
|
||||||
|
|
||||||
fn handle(&mut self, req: HttpRequest<S>) -> Self::Result {
|
fn handle(&mut self, req: HttpRequest<S>) -> Self::Result {
|
||||||
if !self.accessible {
|
if !self.accessible {
|
||||||
Err(io::Error::new(io::ErrorKind::NotFound, "not found"))
|
Ok(self.default.handle(req))
|
||||||
} else {
|
} else {
|
||||||
let path = if let Some(path) = req.match_info().get("tail") {
|
let relpath = match req.match_info().get("tail").map(PathBuf::from_param) {
|
||||||
path
|
Some(Ok(path)) => path,
|
||||||
} else {
|
_ => return Ok(self.default.handle(req))
|
||||||
return Err(io::Error::new(io::ErrorKind::NotFound, "not found"))
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let relpath = PathBuf::from_param(path)
|
|
||||||
.map_err(|_| io::Error::new(io::ErrorKind::NotFound, "not found"))?;
|
|
||||||
|
|
||||||
// full filepath
|
// full filepath
|
||||||
let path = self.directory.join(&relpath).canonicalize()?;
|
let path = self.directory.join(&relpath).canonicalize()?;
|
||||||
|
|
||||||
@@ -474,20 +465,21 @@ impl<S> Handler<S> for StaticFiles {
|
|||||||
new_path.push('/');
|
new_path.push('/');
|
||||||
}
|
}
|
||||||
new_path.push_str(redir_index);
|
new_path.push_str(redir_index);
|
||||||
Ok(FilesystemElement::Redirect(
|
HttpFound.build()
|
||||||
HttpFound
|
|
||||||
.build()
|
|
||||||
.header(header::http::LOCATION, new_path.as_str())
|
.header(header::http::LOCATION, new_path.as_str())
|
||||||
.finish().unwrap()))
|
.finish().unwrap()
|
||||||
|
.respond_to(req.without_state())
|
||||||
} else if self.show_index {
|
} else if self.show_index {
|
||||||
Ok(FilesystemElement::Directory(
|
Directory::new(self.directory.clone(), path)
|
||||||
Directory::new(self.directory.clone(), path)))
|
.respond_to(req.without_state())?
|
||||||
|
.respond_to(req.without_state())
|
||||||
} else {
|
} else {
|
||||||
Err(io::Error::new(io::ErrorKind::NotFound, "not found"))
|
Ok(self.default.handle(req))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
Ok(FilesystemElement::File(
|
NamedFile::open(path)?.set_cpu_pool(self.cpu_pool.clone())
|
||||||
NamedFile::open(path)?.set_cpu_pool(self.cpu_pool.clone())))
|
.respond_to(req.without_state())?
|
||||||
|
.respond_to(req.without_state())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -517,25 +509,38 @@ mod tests {
|
|||||||
let req = TestRequest::default().method(Method::POST).finish();
|
let req = TestRequest::default().method(Method::POST).finish();
|
||||||
let file = NamedFile::open("Cargo.toml").unwrap();
|
let file = NamedFile::open("Cargo.toml").unwrap();
|
||||||
|
|
||||||
let resp = file.respond_to(req).unwrap();
|
let resp = file.only_get().respond_to(req).unwrap();
|
||||||
assert_eq!(resp.status(), StatusCode::METHOD_NOT_ALLOWED);
|
assert_eq!(resp.status(), StatusCode::METHOD_NOT_ALLOWED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_named_file_any_method() {
|
||||||
|
let req = TestRequest::default().method(Method::POST).finish();
|
||||||
|
let file = NamedFile::open("Cargo.toml").unwrap();
|
||||||
|
let resp = file.respond_to(req).unwrap();
|
||||||
|
assert_eq!(resp.status(), StatusCode::OK);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_static_files() {
|
fn test_static_files() {
|
||||||
let mut st = StaticFiles::new(".", true);
|
let mut st = StaticFiles::new(".", true);
|
||||||
st.accessible = false;
|
st.accessible = false;
|
||||||
assert!(st.handle(HttpRequest::default()).is_err());
|
let resp = st.handle(HttpRequest::default()).respond_to(HttpRequest::default()).unwrap();
|
||||||
|
let resp = resp.as_response().expect("HTTP Response");
|
||||||
|
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
|
||||||
|
|
||||||
st.accessible = true;
|
st.accessible = true;
|
||||||
st.show_index = false;
|
st.show_index = false;
|
||||||
assert!(st.handle(HttpRequest::default()).is_err());
|
let resp = st.handle(HttpRequest::default()).respond_to(HttpRequest::default()).unwrap();
|
||||||
|
let resp = resp.as_response().expect("HTTP Response");
|
||||||
|
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
|
||||||
|
|
||||||
let mut req = HttpRequest::default();
|
let mut req = HttpRequest::default();
|
||||||
req.match_info_mut().add("tail", "");
|
req.match_info_mut().add("tail", "");
|
||||||
|
|
||||||
st.show_index = true;
|
st.show_index = true;
|
||||||
let resp = st.handle(req).respond_to(HttpRequest::default()).unwrap();
|
let resp = st.handle(req).respond_to(HttpRequest::default()).unwrap();
|
||||||
|
let resp = resp.as_response().expect("HTTP Response");
|
||||||
assert_eq!(resp.headers().get(header::CONTENT_TYPE).unwrap(), "text/html; charset=utf-8");
|
assert_eq!(resp.headers().get(header::CONTENT_TYPE).unwrap(), "text/html; charset=utf-8");
|
||||||
assert!(resp.body().is_binary());
|
assert!(resp.body().is_binary());
|
||||||
assert!(format!("{:?}", resp.body()).contains("README.md"));
|
assert!(format!("{:?}", resp.body()).contains("README.md"));
|
||||||
@@ -548,6 +553,7 @@ mod tests {
|
|||||||
req.match_info_mut().add("tail", "guide");
|
req.match_info_mut().add("tail", "guide");
|
||||||
|
|
||||||
let resp = st.handle(req).respond_to(HttpRequest::default()).unwrap();
|
let resp = st.handle(req).respond_to(HttpRequest::default()).unwrap();
|
||||||
|
let resp = resp.as_response().expect("HTTP Response");
|
||||||
assert_eq!(resp.status(), StatusCode::FOUND);
|
assert_eq!(resp.status(), StatusCode::FOUND);
|
||||||
assert_eq!(resp.headers().get(header::LOCATION).unwrap(), "/guide/index.html");
|
assert_eq!(resp.headers().get(header::LOCATION).unwrap(), "/guide/index.html");
|
||||||
|
|
||||||
@@ -555,6 +561,7 @@ mod tests {
|
|||||||
req.match_info_mut().add("tail", "guide/");
|
req.match_info_mut().add("tail", "guide/");
|
||||||
|
|
||||||
let resp = st.handle(req).respond_to(HttpRequest::default()).unwrap();
|
let resp = st.handle(req).respond_to(HttpRequest::default()).unwrap();
|
||||||
|
let resp = resp.as_response().expect("HTTP Response");
|
||||||
assert_eq!(resp.status(), StatusCode::FOUND);
|
assert_eq!(resp.status(), StatusCode::FOUND);
|
||||||
assert_eq!(resp.headers().get(header::LOCATION).unwrap(), "/guide/index.html");
|
assert_eq!(resp.headers().get(header::LOCATION).unwrap(), "/guide/index.html");
|
||||||
}
|
}
|
||||||
@@ -566,6 +573,7 @@ mod tests {
|
|||||||
req.match_info_mut().add("tail", "examples/basics");
|
req.match_info_mut().add("tail", "examples/basics");
|
||||||
|
|
||||||
let resp = st.handle(req).respond_to(HttpRequest::default()).unwrap();
|
let resp = st.handle(req).respond_to(HttpRequest::default()).unwrap();
|
||||||
|
let resp = resp.as_response().expect("HTTP Response");
|
||||||
assert_eq!(resp.status(), StatusCode::FOUND);
|
assert_eq!(resp.status(), StatusCode::FOUND);
|
||||||
assert_eq!(resp.headers().get(header::LOCATION).unwrap(), "/examples/basics/Cargo.toml");
|
assert_eq!(resp.headers().get(header::LOCATION).unwrap(), "/examples/basics/Cargo.toml");
|
||||||
}
|
}
|
||||||
|
@@ -119,6 +119,7 @@ pub enum ContentEncoding {
|
|||||||
/// Automatically select encoding based on encoding negotiation
|
/// Automatically select encoding based on encoding negotiation
|
||||||
Auto,
|
Auto,
|
||||||
/// A format using the Brotli algorithm
|
/// A format using the Brotli algorithm
|
||||||
|
#[cfg(feature="brotli")]
|
||||||
Br,
|
Br,
|
||||||
/// A format using the zlib structure with deflate algorithm
|
/// A format using the zlib structure with deflate algorithm
|
||||||
Deflate,
|
Deflate,
|
||||||
@@ -141,6 +142,7 @@ impl ContentEncoding {
|
|||||||
#[inline]
|
#[inline]
|
||||||
pub fn as_str(&self) -> &'static str {
|
pub fn as_str(&self) -> &'static str {
|
||||||
match *self {
|
match *self {
|
||||||
|
#[cfg(feature="brotli")]
|
||||||
ContentEncoding::Br => "br",
|
ContentEncoding::Br => "br",
|
||||||
ContentEncoding::Gzip => "gzip",
|
ContentEncoding::Gzip => "gzip",
|
||||||
ContentEncoding::Deflate => "deflate",
|
ContentEncoding::Deflate => "deflate",
|
||||||
@@ -150,6 +152,7 @@ impl ContentEncoding {
|
|||||||
/// default quality value
|
/// default quality value
|
||||||
pub fn quality(&self) -> f64 {
|
pub fn quality(&self) -> f64 {
|
||||||
match *self {
|
match *self {
|
||||||
|
#[cfg(feature="brotli")]
|
||||||
ContentEncoding::Br => 1.1,
|
ContentEncoding::Br => 1.1,
|
||||||
ContentEncoding::Gzip => 1.0,
|
ContentEncoding::Gzip => 1.0,
|
||||||
ContentEncoding::Deflate => 0.9,
|
ContentEncoding::Deflate => 0.9,
|
||||||
@@ -162,6 +165,7 @@ impl ContentEncoding {
|
|||||||
impl<'a> From<&'a str> for ContentEncoding {
|
impl<'a> From<&'a str> for ContentEncoding {
|
||||||
fn from(s: &'a str) -> ContentEncoding {
|
fn from(s: &'a str) -> ContentEncoding {
|
||||||
match s.trim().to_lowercase().as_ref() {
|
match s.trim().to_lowercase().as_ref() {
|
||||||
|
#[cfg(feature="brotli")]
|
||||||
"br" => ContentEncoding::Br,
|
"br" => ContentEncoding::Br,
|
||||||
"gzip" => ContentEncoding::Gzip,
|
"gzip" => ContentEncoding::Gzip,
|
||||||
"deflate" => ContentEncoding::Deflate,
|
"deflate" => ContentEncoding::Deflate,
|
||||||
|
@@ -79,6 +79,7 @@ extern crate libc;
|
|||||||
extern crate serde;
|
extern crate serde;
|
||||||
extern crate serde_json;
|
extern crate serde_json;
|
||||||
extern crate flate2;
|
extern crate flate2;
|
||||||
|
#[cfg(feature="brotli")]
|
||||||
extern crate brotli2;
|
extern crate brotli2;
|
||||||
extern crate encoding;
|
extern crate encoding;
|
||||||
extern crate percent_encoding;
|
extern crate percent_encoding;
|
||||||
|
@@ -5,6 +5,7 @@ use std::cell::RefCell;
|
|||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use bytes::{Bytes, BytesMut};
|
use bytes::{Bytes, BytesMut};
|
||||||
use futures::{Async, Poll, Stream};
|
use futures::{Async, Poll, Stream};
|
||||||
|
use futures::task::{Task, current as current_task};
|
||||||
|
|
||||||
use error::PayloadError;
|
use error::PayloadError;
|
||||||
|
|
||||||
@@ -174,6 +175,7 @@ struct Inner {
|
|||||||
need_read: bool,
|
need_read: bool,
|
||||||
items: VecDeque<Bytes>,
|
items: VecDeque<Bytes>,
|
||||||
capacity: usize,
|
capacity: usize,
|
||||||
|
task: Option<Task>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Inner {
|
impl Inner {
|
||||||
@@ -186,6 +188,7 @@ impl Inner {
|
|||||||
items: VecDeque::new(),
|
items: VecDeque::new(),
|
||||||
need_read: true,
|
need_read: true,
|
||||||
capacity: MAX_BUFFER_SIZE,
|
capacity: MAX_BUFFER_SIZE,
|
||||||
|
task: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -204,6 +207,9 @@ impl Inner {
|
|||||||
self.len += data.len();
|
self.len += data.len();
|
||||||
self.items.push_back(data);
|
self.items.push_back(data);
|
||||||
self.need_read = self.len < self.capacity;
|
self.need_read = self.len < self.capacity;
|
||||||
|
if let Some(task) = self.task.take() {
|
||||||
|
task.notify()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
@@ -237,6 +243,12 @@ impl Inner {
|
|||||||
if let Some(data) = self.items.pop_front() {
|
if let Some(data) = self.items.pop_front() {
|
||||||
self.len -= data.len();
|
self.len -= data.len();
|
||||||
self.need_read = self.len < self.capacity;
|
self.need_read = self.len < self.capacity;
|
||||||
|
#[cfg(not(test))]
|
||||||
|
{
|
||||||
|
if self.need_read && self.task.is_none() {
|
||||||
|
self.task = Some(current_task());
|
||||||
|
}
|
||||||
|
}
|
||||||
Ok(Async::Ready(Some(data)))
|
Ok(Async::Ready(Some(data)))
|
||||||
} else if let Some(err) = self.err.take() {
|
} else if let Some(err) = self.err.take() {
|
||||||
Err(err)
|
Err(err)
|
||||||
@@ -244,6 +256,12 @@ impl Inner {
|
|||||||
Ok(Async::Ready(None))
|
Ok(Async::Ready(None))
|
||||||
} else {
|
} else {
|
||||||
self.need_read = true;
|
self.need_read = true;
|
||||||
|
#[cfg(not(test))]
|
||||||
|
{
|
||||||
|
if self.task.is_none() {
|
||||||
|
self.task = Some(current_task());
|
||||||
|
}
|
||||||
|
}
|
||||||
Ok(Async::NotReady)
|
Ok(Async::NotReady)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -481,11 +481,16 @@ impl<S: 'static, H> ProcessResponse<S, H> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// always poll stream or actor for the first time
|
||||||
match self.resp.replace_body(Body::Empty) {
|
match self.resp.replace_body(Body::Empty) {
|
||||||
Body::Streaming(stream) =>
|
Body::Streaming(stream) => {
|
||||||
self.iostate = IOState::Payload(stream),
|
self.iostate = IOState::Payload(stream);
|
||||||
Body::Actor(ctx) =>
|
continue
|
||||||
self.iostate = IOState::Actor(ctx),
|
},
|
||||||
|
Body::Actor(ctx) => {
|
||||||
|
self.iostate = IOState::Actor(ctx);
|
||||||
|
continue
|
||||||
|
},
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -3,6 +3,7 @@ use std::io::{Read, Write};
|
|||||||
use std::fmt::Write as FmtWrite;
|
use std::fmt::Write as FmtWrite;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
use bytes::{Bytes, BytesMut, BufMut};
|
||||||
use http::{Version, Method, HttpTryFrom};
|
use http::{Version, Method, HttpTryFrom};
|
||||||
use http::header::{HeaderMap, HeaderValue,
|
use http::header::{HeaderMap, HeaderValue,
|
||||||
ACCEPT_ENCODING, CONNECTION,
|
ACCEPT_ENCODING, CONNECTION,
|
||||||
@@ -10,8 +11,8 @@ use http::header::{HeaderMap, HeaderValue,
|
|||||||
use flate2::Compression;
|
use flate2::Compression;
|
||||||
use flate2::read::GzDecoder;
|
use flate2::read::GzDecoder;
|
||||||
use flate2::write::{GzEncoder, DeflateDecoder, DeflateEncoder};
|
use flate2::write::{GzEncoder, DeflateDecoder, DeflateEncoder};
|
||||||
|
#[cfg(feature="brotli")]
|
||||||
use brotli2::write::{BrotliDecoder, BrotliEncoder};
|
use brotli2::write::{BrotliDecoder, BrotliEncoder};
|
||||||
use bytes::{Bytes, BytesMut, BufMut};
|
|
||||||
|
|
||||||
use header::ContentEncoding;
|
use header::ContentEncoding;
|
||||||
use body::{Body, Binary};
|
use body::{Body, Binary};
|
||||||
@@ -144,6 +145,7 @@ impl PayloadWriter for EncodedPayload {
|
|||||||
pub(crate) enum Decoder {
|
pub(crate) enum Decoder {
|
||||||
Deflate(Box<DeflateDecoder<Writer>>),
|
Deflate(Box<DeflateDecoder<Writer>>),
|
||||||
Gzip(Option<Box<GzDecoder<Wrapper>>>),
|
Gzip(Option<Box<GzDecoder<Wrapper>>>),
|
||||||
|
#[cfg(feature="brotli")]
|
||||||
Br(Box<BrotliDecoder<Writer>>),
|
Br(Box<BrotliDecoder<Writer>>),
|
||||||
Identity,
|
Identity,
|
||||||
}
|
}
|
||||||
@@ -214,6 +216,7 @@ pub(crate) struct PayloadStream {
|
|||||||
impl PayloadStream {
|
impl PayloadStream {
|
||||||
pub fn new(enc: ContentEncoding) -> PayloadStream {
|
pub fn new(enc: ContentEncoding) -> PayloadStream {
|
||||||
let dec = match enc {
|
let dec = match enc {
|
||||||
|
#[cfg(feature="brotli")]
|
||||||
ContentEncoding::Br => Decoder::Br(
|
ContentEncoding::Br => Decoder::Br(
|
||||||
Box::new(BrotliDecoder::new(Writer::new()))),
|
Box::new(BrotliDecoder::new(Writer::new()))),
|
||||||
ContentEncoding::Deflate => Decoder::Deflate(
|
ContentEncoding::Deflate => Decoder::Deflate(
|
||||||
@@ -229,6 +232,7 @@ impl PayloadStream {
|
|||||||
|
|
||||||
pub fn feed_eof(&mut self) -> io::Result<Option<Bytes>> {
|
pub fn feed_eof(&mut self) -> io::Result<Option<Bytes>> {
|
||||||
match self.decoder {
|
match self.decoder {
|
||||||
|
#[cfg(feature="brotli")]
|
||||||
Decoder::Br(ref mut decoder) => {
|
Decoder::Br(ref mut decoder) => {
|
||||||
match decoder.finish() {
|
match decoder.finish() {
|
||||||
Ok(mut writer) => {
|
Ok(mut writer) => {
|
||||||
@@ -278,6 +282,7 @@ impl PayloadStream {
|
|||||||
|
|
||||||
pub fn feed_data(&mut self, data: Bytes) -> io::Result<Option<Bytes>> {
|
pub fn feed_data(&mut self, data: Bytes) -> io::Result<Option<Bytes>> {
|
||||||
match self.decoder {
|
match self.decoder {
|
||||||
|
#[cfg(feature="brotli")]
|
||||||
Decoder::Br(ref mut decoder) => {
|
Decoder::Br(ref mut decoder) => {
|
||||||
match decoder.write_all(&data) {
|
match decoder.write_all(&data) {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
@@ -346,6 +351,7 @@ impl PayloadStream {
|
|||||||
pub(crate) enum ContentEncoder {
|
pub(crate) enum ContentEncoder {
|
||||||
Deflate(DeflateEncoder<TransferEncoding>),
|
Deflate(DeflateEncoder<TransferEncoding>),
|
||||||
Gzip(GzEncoder<TransferEncoding>),
|
Gzip(GzEncoder<TransferEncoding>),
|
||||||
|
#[cfg(feature="brotli")]
|
||||||
Br(BrotliEncoder<TransferEncoding>),
|
Br(BrotliEncoder<TransferEncoding>),
|
||||||
Identity(TransferEncoding),
|
Identity(TransferEncoding),
|
||||||
}
|
}
|
||||||
@@ -412,6 +418,7 @@ impl ContentEncoder {
|
|||||||
DeflateEncoder::new(transfer, Compression::default())),
|
DeflateEncoder::new(transfer, Compression::default())),
|
||||||
ContentEncoding::Gzip => ContentEncoder::Gzip(
|
ContentEncoding::Gzip => ContentEncoder::Gzip(
|
||||||
GzEncoder::new(transfer, Compression::default())),
|
GzEncoder::new(transfer, Compression::default())),
|
||||||
|
#[cfg(feature="brotli")]
|
||||||
ContentEncoding::Br => ContentEncoder::Br(
|
ContentEncoding::Br => ContentEncoder::Br(
|
||||||
BrotliEncoder::new(transfer, 5)),
|
BrotliEncoder::new(transfer, 5)),
|
||||||
ContentEncoding::Identity => ContentEncoder::Identity(transfer),
|
ContentEncoding::Identity => ContentEncoder::Identity(transfer),
|
||||||
@@ -464,6 +471,7 @@ impl ContentEncoder {
|
|||||||
DeflateEncoder::new(transfer, Compression::default())),
|
DeflateEncoder::new(transfer, Compression::default())),
|
||||||
ContentEncoding::Gzip => ContentEncoder::Gzip(
|
ContentEncoding::Gzip => ContentEncoder::Gzip(
|
||||||
GzEncoder::new(transfer, Compression::default())),
|
GzEncoder::new(transfer, Compression::default())),
|
||||||
|
#[cfg(feature="brotli")]
|
||||||
ContentEncoding::Br => ContentEncoder::Br(
|
ContentEncoding::Br => ContentEncoder::Br(
|
||||||
BrotliEncoder::new(transfer, 5)),
|
BrotliEncoder::new(transfer, 5)),
|
||||||
ContentEncoding::Identity | ContentEncoding::Auto =>
|
ContentEncoding::Identity | ContentEncoding::Auto =>
|
||||||
@@ -538,6 +546,7 @@ impl ContentEncoder {
|
|||||||
#[inline]
|
#[inline]
|
||||||
pub fn is_eof(&self) -> bool {
|
pub fn is_eof(&self) -> bool {
|
||||||
match *self {
|
match *self {
|
||||||
|
#[cfg(feature="brotli")]
|
||||||
ContentEncoder::Br(ref encoder) => encoder.get_ref().is_eof(),
|
ContentEncoder::Br(ref encoder) => encoder.get_ref().is_eof(),
|
||||||
ContentEncoder::Deflate(ref encoder) => encoder.get_ref().is_eof(),
|
ContentEncoder::Deflate(ref encoder) => encoder.get_ref().is_eof(),
|
||||||
ContentEncoder::Gzip(ref encoder) => encoder.get_ref().is_eof(),
|
ContentEncoder::Gzip(ref encoder) => encoder.get_ref().is_eof(),
|
||||||
@@ -552,6 +561,7 @@ impl ContentEncoder {
|
|||||||
self, ContentEncoder::Identity(TransferEncoding::eof(SharedBytes::empty())));
|
self, ContentEncoder::Identity(TransferEncoding::eof(SharedBytes::empty())));
|
||||||
|
|
||||||
match encoder {
|
match encoder {
|
||||||
|
#[cfg(feature="brotli")]
|
||||||
ContentEncoder::Br(encoder) => {
|
ContentEncoder::Br(encoder) => {
|
||||||
match encoder.finish() {
|
match encoder.finish() {
|
||||||
Ok(mut writer) => {
|
Ok(mut writer) => {
|
||||||
@@ -594,6 +604,7 @@ impl ContentEncoder {
|
|||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
pub fn write(&mut self, data: Binary) -> Result<(), io::Error> {
|
pub fn write(&mut self, data: Binary) -> Result<(), io::Error> {
|
||||||
match *self {
|
match *self {
|
||||||
|
#[cfg(feature="brotli")]
|
||||||
ContentEncoder::Br(ref mut encoder) => {
|
ContentEncoder::Br(ref mut encoder) => {
|
||||||
match encoder.write_all(data.as_ref()) {
|
match encoder.write_all(data.as_ref()) {
|
||||||
Ok(_) => Ok(()),
|
Ok(_) => Ok(()),
|
||||||
|
@@ -32,8 +32,10 @@ const MAX_PIPELINED_MESSAGES: usize = 16;
|
|||||||
|
|
||||||
bitflags! {
|
bitflags! {
|
||||||
struct Flags: u8 {
|
struct Flags: u8 {
|
||||||
|
const STARTED = 0b0000_0001;
|
||||||
const ERROR = 0b0000_0010;
|
const ERROR = 0b0000_0010;
|
||||||
const KEEPALIVE = 0b0000_0100;
|
const KEEPALIVE = 0b0000_0100;
|
||||||
|
const SHUTDOWN = 0b0000_1000;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -94,17 +96,32 @@ impl<T, H> Http1<T, H>
|
|||||||
match timer.poll() {
|
match timer.poll() {
|
||||||
Ok(Async::Ready(_)) => {
|
Ok(Async::Ready(_)) => {
|
||||||
trace!("Keep-alive timeout, close connection");
|
trace!("Keep-alive timeout, close connection");
|
||||||
return Ok(Async::Ready(()))
|
self.flags.insert(Flags::SHUTDOWN);
|
||||||
}
|
}
|
||||||
Ok(Async::NotReady) => (),
|
Ok(Async::NotReady) => (),
|
||||||
Err(_) => unreachable!(),
|
Err(_) => unreachable!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// shutdown
|
||||||
|
if self.flags.contains(Flags::SHUTDOWN) {
|
||||||
|
match self.stream.poll_completed(true) {
|
||||||
|
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||||
|
Ok(Async::Ready(_)) => return Ok(Async::Ready(())),
|
||||||
|
Err(err) => {
|
||||||
|
debug!("Error sending data: {}", err);
|
||||||
|
return Err(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match self.poll_io()? {
|
match self.poll_io()? {
|
||||||
Async::Ready(true) => (),
|
Async::Ready(true) => (),
|
||||||
Async::Ready(false) => return Ok(Async::Ready(())),
|
Async::Ready(false) => {
|
||||||
|
self.flags.insert(Flags::SHUTDOWN);
|
||||||
|
return self.poll()
|
||||||
|
},
|
||||||
Async::NotReady => return Ok(Async::NotReady),
|
Async::NotReady => return Ok(Async::NotReady),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -120,6 +137,8 @@ impl<T, H> Http1<T, H>
|
|||||||
match self.reader.parse(self.stream.get_mut(),
|
match self.reader.parse(self.stream.get_mut(),
|
||||||
&mut self.read_buf, &self.settings) {
|
&mut self.read_buf, &self.settings) {
|
||||||
Ok(Async::Ready(mut req)) => {
|
Ok(Async::Ready(mut req)) => {
|
||||||
|
self.flags.insert(Flags::STARTED);
|
||||||
|
|
||||||
// set remote addr
|
// set remote addr
|
||||||
req.set_peer_addr(self.addr);
|
req.set_peer_addr(self.addr);
|
||||||
|
|
||||||
@@ -260,7 +279,8 @@ impl<T, H> Http1<T, H>
|
|||||||
}
|
}
|
||||||
|
|
||||||
// check stream state
|
// check stream state
|
||||||
match self.stream.poll_completed(self.tasks.is_empty()) {
|
if self.flags.contains(Flags::STARTED) {
|
||||||
|
match self.stream.poll_completed(false) {
|
||||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
debug!("Error sending data: {}", err);
|
debug!("Error sending data: {}", err);
|
||||||
@@ -268,13 +288,15 @@ impl<T, H> Http1<T, H>
|
|||||||
}
|
}
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// deal with keep-alive
|
// deal with keep-alive
|
||||||
if self.tasks.is_empty() {
|
if self.tasks.is_empty() {
|
||||||
// no keep-alive situations
|
// no keep-alive situations
|
||||||
if self.flags.contains(Flags::ERROR)
|
if (self.flags.contains(Flags::ERROR)
|
||||||
|| !self.flags.contains(Flags::KEEPALIVE)
|
|| !self.flags.contains(Flags::KEEPALIVE)
|
||||||
|| !self.settings.keep_alive_enabled()
|
|| !self.settings.keep_alive_enabled()) &&
|
||||||
|
self.flags.contains(Flags::STARTED)
|
||||||
{
|
{
|
||||||
return Ok(Async::Ready(false))
|
return Ok(Async::Ready(false))
|
||||||
}
|
}
|
||||||
|
@@ -41,6 +41,7 @@ pub struct HttpServer<H> where H: IntoHttpHandler + 'static
|
|||||||
exit: bool,
|
exit: bool,
|
||||||
shutdown_timeout: u16,
|
shutdown_timeout: u16,
|
||||||
signals: Option<Addr<Syn, signal::ProcessSignals>>,
|
signals: Option<Addr<Syn, signal::ProcessSignals>>,
|
||||||
|
no_http2: bool,
|
||||||
no_signals: bool,
|
no_signals: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -89,6 +90,7 @@ impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
|
|||||||
exit: false,
|
exit: false,
|
||||||
shutdown_timeout: 30,
|
shutdown_timeout: 30,
|
||||||
signals: None,
|
signals: None,
|
||||||
|
no_http2: false,
|
||||||
no_signals: false,
|
no_signals: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -170,6 +172,12 @@ impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Disable `HTTP/2` support
|
||||||
|
pub fn no_http2(mut self) -> Self {
|
||||||
|
self.no_http2 = true;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Get addresses of bound sockets.
|
/// Get addresses of bound sockets.
|
||||||
pub fn addrs(&self) -> Vec<net::SocketAddr> {
|
pub fn addrs(&self) -> Vec<net::SocketAddr> {
|
||||||
self.sockets.iter().map(|s| s.0).collect()
|
self.sockets.iter().map(|s| s.0).collect()
|
||||||
@@ -396,6 +404,7 @@ impl<H: IntoHttpHandler> HttpServer<H>
|
|||||||
Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound"))
|
Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound"))
|
||||||
} else {
|
} else {
|
||||||
// alpn support
|
// alpn support
|
||||||
|
if !self.no_http2 {
|
||||||
builder.set_alpn_protos(b"\x02h2\x08http/1.1")?;
|
builder.set_alpn_protos(b"\x02h2\x08http/1.1")?;
|
||||||
builder.set_alpn_select_callback(|_, protos| {
|
builder.set_alpn_select_callback(|_, protos| {
|
||||||
const H2: &[u8] = b"\x02h2";
|
const H2: &[u8] = b"\x02h2";
|
||||||
@@ -405,6 +414,7 @@ impl<H: IntoHttpHandler> HttpServer<H>
|
|||||||
Err(AlpnError::NOACK)
|
Err(AlpnError::NOACK)
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
}
|
||||||
|
|
||||||
let (tx, rx) = mpsc::unbounded();
|
let (tx, rx) = mpsc::unbounded();
|
||||||
let acceptor = builder.build();
|
let acceptor = builder.build();
|
||||||
@@ -734,6 +744,13 @@ fn start_accept_thread(
|
|||||||
workers[next].0, info.clone()));
|
workers[next].0, info.clone()));
|
||||||
msg = err.into_inner();
|
msg = err.into_inner();
|
||||||
workers.swap_remove(next);
|
workers.swap_remove(next);
|
||||||
|
if workers.is_empty() {
|
||||||
|
error!("No workers");
|
||||||
|
thread::sleep(sleep);
|
||||||
|
break
|
||||||
|
} else if workers.len() <= next {
|
||||||
|
next = 0;
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -204,7 +204,8 @@ impl StreamHandlerType {
|
|||||||
} else {
|
} else {
|
||||||
false
|
false
|
||||||
};
|
};
|
||||||
Arbiter::handle().spawn(HttpChannel::new(h, io, peer, http2));
|
Arbiter::handle().spawn(
|
||||||
|
HttpChannel::new(h, io, peer, http2));
|
||||||
},
|
},
|
||||||
Err(err) =>
|
Err(err) =>
|
||||||
trace!("Error during handling tls connection: {}", err),
|
trace!("Error during handling tls connection: {}", err),
|
||||||
|
@@ -87,9 +87,12 @@ impl TestServer {
|
|||||||
let sys = System::new("actix-test-server");
|
let sys = System::new("actix-test-server");
|
||||||
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
|
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
|
||||||
let local_addr = tcp.local_addr().unwrap();
|
let local_addr = tcp.local_addr().unwrap();
|
||||||
let tcp = TcpListener::from_listener(tcp, &local_addr, Arbiter::handle()).unwrap();
|
let tcp = TcpListener::from_listener(
|
||||||
|
tcp, &local_addr, Arbiter::handle()).unwrap();
|
||||||
|
|
||||||
HttpServer::new(factory).disable_signals().start_incoming(tcp.incoming(), false);
|
HttpServer::new(factory)
|
||||||
|
.disable_signals()
|
||||||
|
.start_incoming(tcp.incoming(), false);
|
||||||
|
|
||||||
tx.send((Arbiter::system(), local_addr)).unwrap();
|
tx.send((Arbiter::system(), local_addr)).unwrap();
|
||||||
let _ = sys.run();
|
let _ = sys.run();
|
||||||
|
@@ -66,6 +66,21 @@ fn test_simple() {
|
|||||||
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
|
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_with_query_parameter() {
|
||||||
|
let mut srv = test::TestServer::new(
|
||||||
|
|app| app.handler(|req: HttpRequest| match req.query().get("qp") {
|
||||||
|
Some(_) => httpcodes::HTTPOk.build().finish(),
|
||||||
|
None => httpcodes::HTTPBadRequest.build().finish(),
|
||||||
|
}));
|
||||||
|
|
||||||
|
let request = srv.get().uri(srv.url("/?qp=5").as_str()).finish().unwrap();
|
||||||
|
|
||||||
|
let response = srv.execute(request.send()).unwrap();
|
||||||
|
assert!(response.status().is_success());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_no_decompress() {
|
fn test_no_decompress() {
|
||||||
let mut srv = test::TestServer::new(
|
let mut srv = test::TestServer::new(
|
||||||
|
@@ -743,7 +743,7 @@ fn test_h2() {
|
|||||||
})
|
})
|
||||||
});
|
});
|
||||||
let _res = core.run(tcp);
|
let _res = core.run(tcp);
|
||||||
// assert_eq!(_res.unwrap(), Bytes::from_static(STR.as_ref()));
|
// assert_eq!(res.unwrap(), Bytes::from_static(STR.as_ref()));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
Reference in New Issue
Block a user