1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-07-16 14:45:47 +02:00

Compare commits

...

29 Commits

Author SHA1 Message Date
Nikolay Kim
1fe4315c94 use actix 0.5.4 2018-03-16 13:37:47 -07:00
Nikolay Kim
381b90e9a1 bump version 2018-03-16 12:31:29 -07:00
Nikolay Kim
2d18dba40a fix compilation 2018-03-16 12:28:08 -07:00
Nikolay Kim
d2693d58a8 clippy warnings 2018-03-16 12:12:55 -07:00
Nikolay Kim
84bf282c17 add basic client connection pooling 2018-03-16 12:04:01 -07:00
Nikolay Kim
b15b5e5246 check number of available workers 2018-03-16 11:17:27 -07:00
Douman
52b3b0c362 Merge pull request #119 from DoumanAsh/default_static_files
Add default resource for StaticFiles
2018-03-16 20:12:07 +03:00
Nikolay Kim
64c4cefa8f Merge branch 'master' into default_static_files 2018-03-16 09:31:36 -07:00
Nikolay Kim
7e8b231f57 disable test 2018-03-16 09:13:36 -07:00
Douman
8a344d0c94 Add default resource for StaticFiles 2018-03-16 19:04:36 +03:00
Nikolay Kim
4096089a3f allow to disable http/2 support 2018-03-16 08:48:44 -07:00
Nikolay Kim
b16f2d5f05 proper check for actor context poll 2018-03-16 08:04:26 -07:00
Nikolay Kim
5baf15822a always start actors 2018-03-16 07:46:27 -07:00
Nikolay Kim
5368ce823e Merge pull request #123 from h416/patch-1
fix typo
2018-03-16 05:31:10 -07:00
h416
4effdf065b fix typo 2018-03-16 19:03:16 +09:00
Nikolay Kim
61970ab190 always poll stream or actor for the first time 2018-03-15 17:11:49 -07:00
Nikolay Kim
484b00a0f9 Merge branch 'master' of github.com:actix/actix-web 2018-03-15 16:55:33 -07:00
Nikolay Kim
73bf2068aa allow to use NamedFile with any request method 2018-03-15 16:55:22 -07:00
Nikolay Kim
1cda949204 Merge pull request #122 from mockersf/test_qp
test for query parameters in client
2018-03-14 16:10:31 -07:00
François Mockers
ad6b823255 test for query parameters in client 2018-03-14 21:45:49 +01:00
Nikolay Kim
0f064db31d Move brotli encoding to a feature 2018-03-13 17:21:22 -07:00
Nikolay Kim
fd0bb54469 add debug formatter for ClientRequestBuilder 2018-03-13 15:09:05 -07:00
Nikolay Kim
e27bbaa55c Update CHANGES.md 2018-03-13 13:15:21 -07:00
Nikolay Kim
8a50eae1e2 Merge pull request #121 from glademiller/master
Send Query Parameters in client requests
2018-03-13 13:14:51 -07:00
Glade Miller
38080f67b3 If no path is available from the URI request / 2018-03-13 13:35:11 -06:00
Glade Miller
08504e0892 Move path call inline into write 2018-03-13 13:26:13 -06:00
Glade Miller
401c0ad809 https://github.com/actix/actix-web/issues/120 - Send Query Parameters in client requests 2018-03-13 13:17:55 -06:00
Nikolay Kim
b4b0deb7fa Wake payload reading task when data is available 2018-03-12 16:29:13 -07:00
Nikolay Kim
05ff35d383 Fix server keep-alive handling 2018-03-12 16:16:17 -07:00
19 changed files with 466 additions and 127 deletions

View File

@@ -1,5 +1,22 @@
# Changes
## 0.4.9 (2018-03-16)
* Allow to disable http/2 support
* Wake payload reading task when data is available
* Fix server keep-alive handling
* Send Query Parameters in client requests #120
* Move brotli encoding to a feature
* Add option of default handler for `StaticFiles` handler #57
* Add basic client connection pooling
## 0.4.8 (2018-03-12)
* Allow to set read buffer capacity for server request
@@ -42,7 +59,7 @@
* Better support for `NamedFile` type
* Add `ResponseError` impl for `SendRequestError`. This improves ergonomics of the client.
* Add native-tls support for client
* Allow client connection timeout to be set #108

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-web"
version = "0.4.8"
version = "0.4.9"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix web is a simple, pragmatic, extremely fast, web framework for Rust."
readme = "README.md"
@@ -27,7 +27,7 @@ name = "actix_web"
path = "src/lib.rs"
[features]
default = ["session"]
default = ["session", "brotli"]
# tls
tls = ["native-tls", "tokio-tls"]
@@ -38,12 +38,14 @@ alpn = ["openssl", "openssl/v102", "openssl/v110", "tokio-openssl"]
# sessions
session = ["cookie/secure"]
# brotli encoding
brotli = ["brotli2"]
[dependencies]
actix = "^0.5.2"
actix = "^0.5.4"
base64 = "0.9"
bitflags = "1.0"
brotli2 = "^0.3.2"
failure = "0.1.1"
flate2 = "1.0"
h2 = "0.1"
@@ -67,6 +69,7 @@ encoding = "0.2"
language-tags = "0.2"
url = { version="1.7", features=["query_encoding"] }
cookie = { version="0.10", features=["percent-encode"] }
brotli2 = { version="^0.3.2", optional = true }
# io
mio = "^0.6.13"

View File

@@ -1,4 +1,4 @@
# websockect
# websocket
Simple echo websocket server.

View File

@@ -1,15 +1,18 @@
use std::{io, time};
use std::{fmt, io, time};
use std::cell::RefCell;
use std::rc::Rc;
use std::net::Shutdown;
use std::time::Duration;
use std::time::{Duration, Instant};
use std::collections::{HashMap, VecDeque};
use actix::{fut, Actor, ActorFuture, Context,
use actix::{fut, Actor, ActorFuture, Context, AsyncContext,
Handler, Message, ActorResponse, Supervised};
use actix::registry::ArbiterService;
use actix::fut::WrapFuture;
use actix::actors::{Connector, ConnectorError, Connect as ResolveConnect};
use http::{Uri, HttpTryFrom, Error as HttpError};
use futures::Poll;
use futures::{Async, Poll};
use tokio_io::{AsyncRead, AsyncWrite};
#[cfg(feature="alpn")]
@@ -104,10 +107,15 @@ pub struct ClientConnector {
connector: SslConnector,
#[cfg(all(feature="tls", not(feature="alpn")))]
connector: TlsConnector,
pool: Rc<Pool>,
}
impl Actor for ClientConnector {
type Context = Context<ClientConnector>;
fn started(&mut self, ctx: &mut Self::Context) {
self.collect(ctx);
}
}
impl Supervised for ClientConnector {}
@@ -120,19 +128,21 @@ impl Default for ClientConnector {
{
let builder = SslConnector::builder(SslMethod::tls()).unwrap();
ClientConnector {
connector: builder.build()
connector: builder.build(),
pool: Rc::new(Pool::new()),
}
}
#[cfg(all(feature="tls", not(feature="alpn")))]
{
let builder = TlsConnector::builder().unwrap();
ClientConnector {
connector: builder.build().unwrap()
connector: builder.build().unwrap(),
pool: Rc::new(Pool::new()),
}
}
#[cfg(not(any(feature="alpn", feature="tls")))]
ClientConnector {}
ClientConnector {pool: Rc::new(Pool::new())}
}
}
@@ -182,7 +192,12 @@ impl ClientConnector {
/// }
/// ```
pub fn with_connector(connector: SslConnector) -> ClientConnector {
ClientConnector { connector }
ClientConnector { connector, pool: Rc::new(Pool::new()) }
}
fn collect(&mut self, ctx: &mut Context<Self>) {
self.pool.collect();
ctx.run_later(Duration::from_secs(1), |act, ctx| act.collect(ctx));
}
}
@@ -214,10 +229,21 @@ impl Handler<Connect> for ClientConnector {
let host = uri.host().unwrap().to_owned();
let port = uri.port().unwrap_or_else(|| proto.port());
let key = Key {host, port, ssl: proto.is_secure()};
let pool = if proto.is_http() {
if let Some(conn) = self.pool.query(&key) {
return ActorResponse::async(fut::ok(conn))
} else {
Some(Rc::clone(&self.pool))
}
} else {
None
};
ActorResponse::async(
Connector::from_registry()
.send(ResolveConnect::host_and_port(&host, port)
.send(ResolveConnect::host_and_port(&key.host, port)
.timeout(conn_timeout))
.into_actor(self)
.map_err(|_, _, _| ClientConnectorError::Disconnected)
@@ -228,12 +254,14 @@ impl Handler<Connect> for ClientConnector {
Ok(stream) => {
if proto.is_secure() {
fut::Either::A(
_act.connector.connect_async(&host, stream)
_act.connector.connect_async(&key.host, stream)
.map_err(ClientConnectorError::SslError)
.map(|stream| Connection{stream: Box::new(stream)})
.map(|stream| Connection::new(
key, pool, Box::new(stream)))
.into_actor(_act))
} else {
fut::Either::B(fut::ok(Connection{stream: Box::new(stream)}))
fut::Either::B(fut::ok(
Connection::new(key, pool, Box::new(stream))))
}
}
}
@@ -244,12 +272,14 @@ impl Handler<Connect> for ClientConnector {
Ok(stream) => {
if proto.is_secure() {
fut::Either::A(
_act.connector.connect_async(&host, stream)
_act.connector.connect_async(&key.host, stream)
.map_err(ClientConnectorError::SslError)
.map(|stream| Connection{stream: Box::new(stream)})
.map(|stream| Connection::new(
key, pool, Box::new(stream)))
.into_actor(_act))
} else {
fut::Either::B(fut::ok(Connection{stream: Box::new(stream)}))
fut::Either::B(fut::ok(
Connection::new(key, pool, Box::new(stream))))
}
}
}
@@ -261,7 +291,7 @@ impl Handler<Connect> for ClientConnector {
if proto.is_secure() {
fut::err(ClientConnectorError::SslIsNotSupported)
} else {
fut::ok(Connection{stream: Box::new(stream)})
fut::ok(Connection::new(key, pool, Box::new(stream)))
}
}
}
@@ -288,6 +318,13 @@ impl Protocol {
}
}
fn is_http(&self) -> bool {
match *self {
Protocol::Https | Protocol::Http => true,
_ => false,
}
}
fn is_secure(&self) -> bool {
match *self {
Protocol::Https | Protocol::Wss => true,
@@ -303,18 +340,156 @@ impl Protocol {
}
}
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
struct Key {
host: String,
port: u16,
ssl: bool,
}
impl Key {
fn empty() -> Key {
Key{host: String::new(), port: 0, ssl: false}
}
}
#[derive(Debug)]
struct Conn(Instant, Connection);
pub struct Pool {
max_size: usize,
keep_alive: Duration,
max_lifetime: Duration,
pool: RefCell<HashMap<Key, VecDeque<Conn>>>,
to_close: RefCell<Vec<Connection>>,
}
impl Pool {
fn new() -> Pool {
Pool {
max_size: 128,
keep_alive: Duration::from_secs(15),
max_lifetime: Duration::from_secs(75),
pool: RefCell::new(HashMap::new()),
to_close: RefCell::new(Vec::new()),
}
}
fn collect(&self) {
let mut pool = self.pool.borrow_mut();
let mut to_close = self.to_close.borrow_mut();
// check keep-alive
let now = Instant::now();
for conns in pool.values_mut() {
while !conns.is_empty() {
if (now - conns[0].0) > self.keep_alive
|| (now - conns[0].1.ts) > self.max_lifetime
{
let conn = conns.pop_front().unwrap().1;
to_close.push(conn);
} else {
break
}
}
}
// check connections for shutdown
let mut idx = 0;
while idx < to_close.len() {
match AsyncWrite::shutdown(&mut to_close[idx]) {
Ok(Async::NotReady) => idx += 1,
_ => {
to_close.swap_remove(idx);
},
}
}
}
fn query(&self, key: &Key) -> Option<Connection> {
let mut pool = self.pool.borrow_mut();
let mut to_close = self.to_close.borrow_mut();
if let Some(ref mut connections) = pool.get_mut(key) {
let now = Instant::now();
while let Some(conn) = connections.pop_back() {
// check if it still usable
if (now - conn.0) > self.keep_alive
|| (now - conn.1.ts) > self.max_lifetime
{
to_close.push(conn.1);
} else {
let mut conn = conn.1;
let mut buf = [0; 2];
match conn.stream().read(&mut buf) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => (),
Ok(n) if n > 0 => {
to_close.push(conn);
continue
},
Ok(_) | Err(_) => continue,
}
return Some(conn)
}
}
}
None
}
fn release(&self, conn: Connection) {
if (Instant::now() - conn.ts) < self.max_lifetime {
let mut pool = self.pool.borrow_mut();
if !pool.contains_key(&conn.key) {
let key = conn.key.clone();
let mut vec = VecDeque::new();
vec.push_back(Conn(Instant::now(), conn));
pool.insert(key, vec);
} else {
let vec = pool.get_mut(&conn.key).unwrap();
vec.push_back(Conn(Instant::now(), conn));
if vec.len() > self.max_size {
let conn = vec.pop_front().unwrap();
self.to_close.borrow_mut().push(conn.1);
}
}
}
}
}
pub struct Connection {
key: Key,
stream: Box<IoStream>,
pool: Option<Rc<Pool>>,
ts: Instant,
}
impl fmt::Debug for Connection {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Connection {}:{}", self.key.host, self.key.port)
}
}
impl Connection {
fn new(key: Key, pool: Option<Rc<Pool>>, stream: Box<IoStream>) -> Self {
Connection {
key, pool, stream,
ts: Instant::now(),
}
}
pub fn stream(&mut self) -> &mut IoStream {
&mut *self.stream
}
pub fn from_stream<T: IoStream>(io: T) -> Connection {
Connection{stream: Box::new(io)}
Connection::new(Key::empty(), None, Box::new(io))
}
pub fn release(mut self) {
if let Some(pool) = self.pool.take() {
pool.release(self)
}
}
}

View File

@@ -171,7 +171,8 @@ impl Future for SendRequest {
};
let pl = Box::new(Pipeline {
body, conn, writer,
body, writer,
conn: Some(conn),
parser: Some(HttpResponseParser::default()),
parser_buf: BytesMut::new(),
disconnected: false,
@@ -208,7 +209,7 @@ impl Future for SendRequest {
pub(crate) struct Pipeline {
body: IoBody,
conn: Connection,
conn: Option<Connection>,
writer: HttpClientWriter,
parser: Option<HttpResponseParser>,
parser_buf: BytesMut,
@@ -249,30 +250,45 @@ impl RunningState {
impl Pipeline {
fn release_conn(&mut self) {
if let Some(conn) = self.conn.take() {
conn.release()
}
}
#[inline]
pub fn parse(&mut self) -> Poll<ClientResponse, HttpResponseParserError> {
match self.parser.as_mut().unwrap().parse(&mut self.conn, &mut self.parser_buf) {
Ok(Async::Ready(resp)) => {
// check content-encoding
if self.should_decompress {
if let Some(enc) = resp.headers().get(CONTENT_ENCODING) {
if let Ok(enc) = enc.to_str() {
match ContentEncoding::from(enc) {
ContentEncoding::Auto | ContentEncoding::Identity => (),
enc => self.decompress = Some(PayloadStream::new(enc)),
fn parse(&mut self) -> Poll<ClientResponse, HttpResponseParserError> {
if let Some(ref mut conn) = self.conn {
match self.parser.as_mut().unwrap().parse(conn, &mut self.parser_buf) {
Ok(Async::Ready(resp)) => {
// check content-encoding
if self.should_decompress {
if let Some(enc) = resp.headers().get(CONTENT_ENCODING) {
if let Ok(enc) = enc.to_str() {
match ContentEncoding::from(enc) {
ContentEncoding::Auto | ContentEncoding::Identity => (),
enc => self.decompress = Some(PayloadStream::new(enc)),
}
}
}
}
}
Ok(Async::Ready(resp))
Ok(Async::Ready(resp))
}
val => val,
}
val => val,
} else {
Ok(Async::NotReady)
}
}
#[inline]
pub fn poll(&mut self) -> Poll<Option<Bytes>, PayloadError> {
if self.conn.is_none() {
return Ok(Async::Ready(None))
}
let conn: &mut Connection = unsafe{ mem::transmute(self.conn.as_mut().unwrap())};
let mut need_run = false;
// need write?
@@ -286,7 +302,7 @@ impl Pipeline {
if self.parser.is_some() {
loop {
match self.parser.as_mut().unwrap()
.parse_payload(&mut self.conn, &mut self.parser_buf)?
.parse_payload(conn, &mut self.parser_buf)?
{
Async::Ready(Some(b)) => {
if let Some(ref mut decompress) = self.decompress {
@@ -314,6 +330,7 @@ impl Pipeline {
if let Some(mut decompress) = self.decompress.take() {
let res = decompress.feed_eof();
if let Some(b) = res? {
self.release_conn();
return Ok(Async::Ready(Some(b)))
}
}
@@ -321,13 +338,14 @@ impl Pipeline {
if need_run {
Ok(Async::NotReady)
} else {
self.release_conn();
Ok(Async::Ready(None))
}
}
#[inline]
pub fn poll_write(&mut self) -> Poll<(), Error> {
if self.write_state == RunningState::Done {
fn poll_write(&mut self) -> Poll<(), Error> {
if self.write_state == RunningState::Done || self.conn.is_none() {
return Ok(Async::Ready(()))
}
@@ -416,7 +434,7 @@ impl Pipeline {
}
// flush io but only if we need to
match self.writer.poll_completed(&mut self.conn, false) {
match self.writer.poll_completed(self.conn.as_mut().unwrap(), false) {
Ok(Async::Ready(_)) => {
if self.disconnected {
self.write_state = RunningState::Done;

View File

@@ -599,3 +599,19 @@ fn parts<'a>(parts: &'a mut Option<ClientRequest>, err: &Option<HttpError>)
}
parts.as_mut()
}
impl fmt::Debug for ClientRequestBuilder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if let Some(ref parts) = self.request {
let res = write!(f, "\nClientRequestBuilder {:?} {}:{}\n",
parts.version, parts.method, parts.uri);
let _ = write!(f, " headers:\n");
for (key, val) in parts.headers.iter() {
let _ = write!(f, " {:?}: {:?}\n", key, val);
}
res
} else {
write!(f, "ClientRequestBuilder(Consumed)")
}
}
}

View File

@@ -13,10 +13,11 @@ use http::header::{HeaderValue, DATE,
CONNECTION, CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING};
use flate2::Compression;
use flate2::write::{GzEncoder, DeflateEncoder};
#[cfg(feature="brotli")]
use brotli2::write::BrotliEncoder;
use body::{Body, Binary};
use headers::ContentEncoding;
use header::ContentEncoding;
use server::WriterState;
use server::shared::SharedBytes;
use server::encoding::{ContentEncoder, TransferEncoding};
@@ -112,7 +113,9 @@ impl HttpClientWriter {
// status line
let _ = write!(buffer, "{} {} {:?}\r\n",
msg.method(), msg.uri().path(), msg.version());
msg.method(),
msg.uri().path_and_query().map(|u| u.as_str()).unwrap_or("/"),
msg.version());
// write headers
for (key, value) in msg.headers() {
@@ -213,6 +216,7 @@ fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> ContentEncoder
DeflateEncoder::new(transfer, Compression::default())),
ContentEncoding::Gzip => ContentEncoder::Gzip(
GzEncoder::new(transfer, Compression::default())),
#[cfg(feature="brotli")]
ContentEncoding::Br => ContentEncoder::Br(
BrotliEncoder::new(transfer, 5)),
ContentEncoding::Identity => ContentEncoder::Identity(transfer),
@@ -262,6 +266,7 @@ fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> ContentEncoder
DeflateEncoder::new(transfer, Compression::default())),
ContentEncoding::Gzip => ContentEncoder::Gzip(
GzEncoder::new(transfer, Compression::default())),
#[cfg(feature="brotli")]
ContentEncoding::Br => ContentEncoder::Br(
BrotliEncoder::new(transfer, 5)),
ContentEncoding::Identity | ContentEncoding::Auto => ContentEncoder::Identity(transfer),

118
src/fs.rs
View File

@@ -21,11 +21,11 @@ use mime_guess::get_mime_type;
use header;
use error::Error;
use param::FromParam;
use handler::{Handler, Responder};
use handler::{Handler, RouteHandler, WrapHandler, Responder, Reply};
use httpmessage::HttpMessage;
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
use httpcodes::{HttpOk, HttpFound, HttpMethodNotAllowed};
use httpcodes::{HttpOk, HttpFound, HttpNotFound, HttpMethodNotAllowed};
/// A file with an associated name; responds with the Content-Type based on the
/// file extension.
@@ -36,6 +36,7 @@ pub struct NamedFile {
md: Metadata,
modified: Option<SystemTime>,
cpu_pool: Option<CpuPool>,
only_get: bool,
}
impl NamedFile {
@@ -54,7 +55,14 @@ impl NamedFile {
let path = path.as_ref().to_path_buf();
let modified = md.modified().ok();
let cpu_pool = None;
Ok(NamedFile{path, file, md, modified, cpu_pool})
Ok(NamedFile{path, file, md, modified, cpu_pool, only_get: false})
}
/// Allow only GET and HEAD methods
#[inline]
pub fn only_get(mut self) -> Self {
self.only_get = true;
self
}
/// Returns reference to the underlying `File` object.
@@ -168,7 +176,7 @@ impl Responder for NamedFile {
type Error = io::Error;
fn respond_to(self, req: HttpRequest) -> Result<HttpResponse, io::Error> {
if *req.method() != Method::GET && *req.method() != Method::HEAD {
if self.only_get && *req.method() != Method::GET && *req.method() != Method::HEAD {
return Ok(HttpMethodNotAllowed.build()
.header(header::http::CONTENT_TYPE, "text/plain")
.header(header::http::ALLOW, "GET, HEAD")
@@ -215,7 +223,9 @@ impl Responder for NamedFile {
return Ok(resp.status(StatusCode::NOT_MODIFIED).finish().unwrap())
}
if *req.method() == Method::GET {
if *req.method() == Method::HEAD {
Ok(resp.finish().unwrap())
} else {
let reader = ChunkedReadFile {
size: self.md.len(),
offset: 0,
@@ -224,8 +234,6 @@ impl Responder for NamedFile {
fut: None,
};
Ok(resp.streaming(reader).unwrap())
} else {
Ok(resp.finish().unwrap())
}
}
}
@@ -354,27 +362,6 @@ impl Responder for Directory {
}
}
/// This enum represents all filesystem elements.
pub enum FilesystemElement {
File(NamedFile),
Directory(Directory),
Redirect(HttpResponse),
}
impl Responder for FilesystemElement {
type Item = HttpResponse;
type Error = io::Error;
fn respond_to(self, req: HttpRequest) -> Result<HttpResponse, io::Error> {
match self {
FilesystemElement::File(file) => file.respond_to(req),
FilesystemElement::Directory(dir) => dir.respond_to(req),
FilesystemElement::Redirect(resp) => Ok(resp),
}
}
}
/// Static files handling
///
/// `StaticFile` handler must be registered with `Application::handler()` method,
@@ -390,23 +377,24 @@ impl Responder for FilesystemElement {
/// .finish();
/// }
/// ```
pub struct StaticFiles {
pub struct StaticFiles<S> {
directory: PathBuf,
accessible: bool,
index: Option<String>,
show_index: bool,
cpu_pool: CpuPool,
default: Box<RouteHandler<S>>,
_chunk_size: usize,
_follow_symlinks: bool,
}
impl StaticFiles {
impl<S: 'static> StaticFiles<S> {
/// Create new `StaticFiles` instance
///
/// `dir` - base directory
///
/// `index` - show index for directory
pub fn new<T: Into<PathBuf>>(dir: T, index: bool) -> StaticFiles {
pub fn new<T: Into<PathBuf>>(dir: T, index: bool) -> StaticFiles<S> {
let dir = dir.into();
let (dir, access) = match dir.canonicalize() {
@@ -430,6 +418,7 @@ impl StaticFiles {
index: None,
show_index: index,
cpu_pool: CpuPool::new(40),
default: Box::new(WrapHandler::new(|_| HttpNotFound)),
_chunk_size: 0,
_follow_symlinks: false,
}
@@ -439,28 +428,30 @@ impl StaticFiles {
///
/// Redirects to specific index file for directory "/" instead of
/// showing files listing.
pub fn index_file<T: Into<String>>(mut self, index: T) -> StaticFiles {
pub fn index_file<T: Into<String>>(mut self, index: T) -> StaticFiles<S> {
self.index = Some(index.into());
self
}
/// Sets default resource which is used when no matched file could be found.
pub fn default_handler<H: Handler<S>>(mut self, handler: H) -> StaticFiles<S> {
self.default = Box::new(WrapHandler::new(handler));
self
}
}
impl<S> Handler<S> for StaticFiles {
type Result = Result<FilesystemElement, io::Error>;
impl<S: 'static> Handler<S> for StaticFiles<S> {
type Result = Result<Reply, Error>;
fn handle(&mut self, req: HttpRequest<S>) -> Self::Result {
if !self.accessible {
Err(io::Error::new(io::ErrorKind::NotFound, "not found"))
Ok(self.default.handle(req))
} else {
let path = if let Some(path) = req.match_info().get("tail") {
path
} else {
return Err(io::Error::new(io::ErrorKind::NotFound, "not found"))
let relpath = match req.match_info().get("tail").map(PathBuf::from_param) {
Some(Ok(path)) => path,
_ => return Ok(self.default.handle(req))
};
let relpath = PathBuf::from_param(path)
.map_err(|_| io::Error::new(io::ErrorKind::NotFound, "not found"))?;
// full filepath
let path = self.directory.join(&relpath).canonicalize()?;
@@ -474,20 +465,21 @@ impl<S> Handler<S> for StaticFiles {
new_path.push('/');
}
new_path.push_str(redir_index);
Ok(FilesystemElement::Redirect(
HttpFound
.build()
.header(header::http::LOCATION, new_path.as_str())
.finish().unwrap()))
HttpFound.build()
.header(header::http::LOCATION, new_path.as_str())
.finish().unwrap()
.respond_to(req.without_state())
} else if self.show_index {
Ok(FilesystemElement::Directory(
Directory::new(self.directory.clone(), path)))
Directory::new(self.directory.clone(), path)
.respond_to(req.without_state())?
.respond_to(req.without_state())
} else {
Err(io::Error::new(io::ErrorKind::NotFound, "not found"))
Ok(self.default.handle(req))
}
} else {
Ok(FilesystemElement::File(
NamedFile::open(path)?.set_cpu_pool(self.cpu_pool.clone())))
NamedFile::open(path)?.set_cpu_pool(self.cpu_pool.clone())
.respond_to(req.without_state())?
.respond_to(req.without_state())
}
}
}
@@ -517,25 +509,38 @@ mod tests {
let req = TestRequest::default().method(Method::POST).finish();
let file = NamedFile::open("Cargo.toml").unwrap();
let resp = file.respond_to(req).unwrap();
let resp = file.only_get().respond_to(req).unwrap();
assert_eq!(resp.status(), StatusCode::METHOD_NOT_ALLOWED);
}
#[test]
fn test_named_file_any_method() {
let req = TestRequest::default().method(Method::POST).finish();
let file = NamedFile::open("Cargo.toml").unwrap();
let resp = file.respond_to(req).unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}
#[test]
fn test_static_files() {
let mut st = StaticFiles::new(".", true);
st.accessible = false;
assert!(st.handle(HttpRequest::default()).is_err());
let resp = st.handle(HttpRequest::default()).respond_to(HttpRequest::default()).unwrap();
let resp = resp.as_response().expect("HTTP Response");
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
st.accessible = true;
st.show_index = false;
assert!(st.handle(HttpRequest::default()).is_err());
let resp = st.handle(HttpRequest::default()).respond_to(HttpRequest::default()).unwrap();
let resp = resp.as_response().expect("HTTP Response");
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
let mut req = HttpRequest::default();
req.match_info_mut().add("tail", "");
st.show_index = true;
let resp = st.handle(req).respond_to(HttpRequest::default()).unwrap();
let resp = resp.as_response().expect("HTTP Response");
assert_eq!(resp.headers().get(header::CONTENT_TYPE).unwrap(), "text/html; charset=utf-8");
assert!(resp.body().is_binary());
assert!(format!("{:?}", resp.body()).contains("README.md"));
@@ -548,6 +553,7 @@ mod tests {
req.match_info_mut().add("tail", "guide");
let resp = st.handle(req).respond_to(HttpRequest::default()).unwrap();
let resp = resp.as_response().expect("HTTP Response");
assert_eq!(resp.status(), StatusCode::FOUND);
assert_eq!(resp.headers().get(header::LOCATION).unwrap(), "/guide/index.html");
@@ -555,6 +561,7 @@ mod tests {
req.match_info_mut().add("tail", "guide/");
let resp = st.handle(req).respond_to(HttpRequest::default()).unwrap();
let resp = resp.as_response().expect("HTTP Response");
assert_eq!(resp.status(), StatusCode::FOUND);
assert_eq!(resp.headers().get(header::LOCATION).unwrap(), "/guide/index.html");
}
@@ -566,6 +573,7 @@ mod tests {
req.match_info_mut().add("tail", "examples/basics");
let resp = st.handle(req).respond_to(HttpRequest::default()).unwrap();
let resp = resp.as_response().expect("HTTP Response");
assert_eq!(resp.status(), StatusCode::FOUND);
assert_eq!(resp.headers().get(header::LOCATION).unwrap(), "/examples/basics/Cargo.toml");
}

View File

@@ -119,6 +119,7 @@ pub enum ContentEncoding {
/// Automatically select encoding based on encoding negotiation
Auto,
/// A format using the Brotli algorithm
#[cfg(feature="brotli")]
Br,
/// A format using the zlib structure with deflate algorithm
Deflate,
@@ -141,6 +142,7 @@ impl ContentEncoding {
#[inline]
pub fn as_str(&self) -> &'static str {
match *self {
#[cfg(feature="brotli")]
ContentEncoding::Br => "br",
ContentEncoding::Gzip => "gzip",
ContentEncoding::Deflate => "deflate",
@@ -150,6 +152,7 @@ impl ContentEncoding {
/// default quality value
pub fn quality(&self) -> f64 {
match *self {
#[cfg(feature="brotli")]
ContentEncoding::Br => 1.1,
ContentEncoding::Gzip => 1.0,
ContentEncoding::Deflate => 0.9,
@@ -162,6 +165,7 @@ impl ContentEncoding {
impl<'a> From<&'a str> for ContentEncoding {
fn from(s: &'a str) -> ContentEncoding {
match s.trim().to_lowercase().as_ref() {
#[cfg(feature="brotli")]
"br" => ContentEncoding::Br,
"gzip" => ContentEncoding::Gzip,
"deflate" => ContentEncoding::Deflate,

View File

@@ -79,6 +79,7 @@ extern crate libc;
extern crate serde;
extern crate serde_json;
extern crate flate2;
#[cfg(feature="brotli")]
extern crate brotli2;
extern crate encoding;
extern crate percent_encoding;

View File

@@ -5,6 +5,7 @@ use std::cell::RefCell;
use std::collections::VecDeque;
use bytes::{Bytes, BytesMut};
use futures::{Async, Poll, Stream};
use futures::task::{Task, current as current_task};
use error::PayloadError;
@@ -174,6 +175,7 @@ struct Inner {
need_read: bool,
items: VecDeque<Bytes>,
capacity: usize,
task: Option<Task>,
}
impl Inner {
@@ -186,6 +188,7 @@ impl Inner {
items: VecDeque::new(),
need_read: true,
capacity: MAX_BUFFER_SIZE,
task: None,
}
}
@@ -204,6 +207,9 @@ impl Inner {
self.len += data.len();
self.items.push_back(data);
self.need_read = self.len < self.capacity;
if let Some(task) = self.task.take() {
task.notify()
}
}
#[inline]
@@ -237,6 +243,12 @@ impl Inner {
if let Some(data) = self.items.pop_front() {
self.len -= data.len();
self.need_read = self.len < self.capacity;
#[cfg(not(test))]
{
if self.need_read && self.task.is_none() {
self.task = Some(current_task());
}
}
Ok(Async::Ready(Some(data)))
} else if let Some(err) = self.err.take() {
Err(err)
@@ -244,6 +256,12 @@ impl Inner {
Ok(Async::Ready(None))
} else {
self.need_read = true;
#[cfg(not(test))]
{
if self.task.is_none() {
self.task = Some(current_task());
}
}
Ok(Async::NotReady)
}
}

View File

@@ -481,11 +481,16 @@ impl<S: 'static, H> ProcessResponse<S, H> {
}
}
// always poll stream or actor for the first time
match self.resp.replace_body(Body::Empty) {
Body::Streaming(stream) =>
self.iostate = IOState::Payload(stream),
Body::Actor(ctx) =>
self.iostate = IOState::Actor(ctx),
Body::Streaming(stream) => {
self.iostate = IOState::Payload(stream);
continue
},
Body::Actor(ctx) => {
self.iostate = IOState::Actor(ctx);
continue
},
_ => (),
}

View File

@@ -3,6 +3,7 @@ use std::io::{Read, Write};
use std::fmt::Write as FmtWrite;
use std::str::FromStr;
use bytes::{Bytes, BytesMut, BufMut};
use http::{Version, Method, HttpTryFrom};
use http::header::{HeaderMap, HeaderValue,
ACCEPT_ENCODING, CONNECTION,
@@ -10,8 +11,8 @@ use http::header::{HeaderMap, HeaderValue,
use flate2::Compression;
use flate2::read::GzDecoder;
use flate2::write::{GzEncoder, DeflateDecoder, DeflateEncoder};
#[cfg(feature="brotli")]
use brotli2::write::{BrotliDecoder, BrotliEncoder};
use bytes::{Bytes, BytesMut, BufMut};
use header::ContentEncoding;
use body::{Body, Binary};
@@ -144,6 +145,7 @@ impl PayloadWriter for EncodedPayload {
pub(crate) enum Decoder {
Deflate(Box<DeflateDecoder<Writer>>),
Gzip(Option<Box<GzDecoder<Wrapper>>>),
#[cfg(feature="brotli")]
Br(Box<BrotliDecoder<Writer>>),
Identity,
}
@@ -214,6 +216,7 @@ pub(crate) struct PayloadStream {
impl PayloadStream {
pub fn new(enc: ContentEncoding) -> PayloadStream {
let dec = match enc {
#[cfg(feature="brotli")]
ContentEncoding::Br => Decoder::Br(
Box::new(BrotliDecoder::new(Writer::new()))),
ContentEncoding::Deflate => Decoder::Deflate(
@@ -229,6 +232,7 @@ impl PayloadStream {
pub fn feed_eof(&mut self) -> io::Result<Option<Bytes>> {
match self.decoder {
#[cfg(feature="brotli")]
Decoder::Br(ref mut decoder) => {
match decoder.finish() {
Ok(mut writer) => {
@@ -278,6 +282,7 @@ impl PayloadStream {
pub fn feed_data(&mut self, data: Bytes) -> io::Result<Option<Bytes>> {
match self.decoder {
#[cfg(feature="brotli")]
Decoder::Br(ref mut decoder) => {
match decoder.write_all(&data) {
Ok(_) => {
@@ -346,6 +351,7 @@ impl PayloadStream {
pub(crate) enum ContentEncoder {
Deflate(DeflateEncoder<TransferEncoding>),
Gzip(GzEncoder<TransferEncoding>),
#[cfg(feature="brotli")]
Br(BrotliEncoder<TransferEncoding>),
Identity(TransferEncoding),
}
@@ -412,6 +418,7 @@ impl ContentEncoder {
DeflateEncoder::new(transfer, Compression::default())),
ContentEncoding::Gzip => ContentEncoder::Gzip(
GzEncoder::new(transfer, Compression::default())),
#[cfg(feature="brotli")]
ContentEncoding::Br => ContentEncoder::Br(
BrotliEncoder::new(transfer, 5)),
ContentEncoding::Identity => ContentEncoder::Identity(transfer),
@@ -464,6 +471,7 @@ impl ContentEncoder {
DeflateEncoder::new(transfer, Compression::default())),
ContentEncoding::Gzip => ContentEncoder::Gzip(
GzEncoder::new(transfer, Compression::default())),
#[cfg(feature="brotli")]
ContentEncoding::Br => ContentEncoder::Br(
BrotliEncoder::new(transfer, 5)),
ContentEncoding::Identity | ContentEncoding::Auto =>
@@ -538,6 +546,7 @@ impl ContentEncoder {
#[inline]
pub fn is_eof(&self) -> bool {
match *self {
#[cfg(feature="brotli")]
ContentEncoder::Br(ref encoder) => encoder.get_ref().is_eof(),
ContentEncoder::Deflate(ref encoder) => encoder.get_ref().is_eof(),
ContentEncoder::Gzip(ref encoder) => encoder.get_ref().is_eof(),
@@ -552,6 +561,7 @@ impl ContentEncoder {
self, ContentEncoder::Identity(TransferEncoding::eof(SharedBytes::empty())));
match encoder {
#[cfg(feature="brotli")]
ContentEncoder::Br(encoder) => {
match encoder.finish() {
Ok(mut writer) => {
@@ -594,6 +604,7 @@ impl ContentEncoder {
#[inline(always)]
pub fn write(&mut self, data: Binary) -> Result<(), io::Error> {
match *self {
#[cfg(feature="brotli")]
ContentEncoder::Br(ref mut encoder) => {
match encoder.write_all(data.as_ref()) {
Ok(_) => Ok(()),

View File

@@ -32,8 +32,10 @@ const MAX_PIPELINED_MESSAGES: usize = 16;
bitflags! {
struct Flags: u8 {
const STARTED = 0b0000_0001;
const ERROR = 0b0000_0010;
const KEEPALIVE = 0b0000_0100;
const SHUTDOWN = 0b0000_1000;
}
}
@@ -94,17 +96,32 @@ impl<T, H> Http1<T, H>
match timer.poll() {
Ok(Async::Ready(_)) => {
trace!("Keep-alive timeout, close connection");
return Ok(Async::Ready(()))
self.flags.insert(Flags::SHUTDOWN);
}
Ok(Async::NotReady) => (),
Err(_) => unreachable!(),
}
}
// shutdown
if self.flags.contains(Flags::SHUTDOWN) {
match self.stream.poll_completed(true) {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(_)) => return Ok(Async::Ready(())),
Err(err) => {
debug!("Error sending data: {}", err);
return Err(())
}
}
}
loop {
match self.poll_io()? {
Async::Ready(true) => (),
Async::Ready(false) => return Ok(Async::Ready(())),
Async::Ready(false) => {
self.flags.insert(Flags::SHUTDOWN);
return self.poll()
},
Async::NotReady => return Ok(Async::NotReady),
}
}
@@ -120,6 +137,8 @@ impl<T, H> Http1<T, H>
match self.reader.parse(self.stream.get_mut(),
&mut self.read_buf, &self.settings) {
Ok(Async::Ready(mut req)) => {
self.flags.insert(Flags::STARTED);
// set remote addr
req.set_peer_addr(self.addr);
@@ -260,21 +279,24 @@ impl<T, H> Http1<T, H>
}
// check stream state
match self.stream.poll_completed(self.tasks.is_empty()) {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => {
debug!("Error sending data: {}", err);
return Err(())
if self.flags.contains(Flags::STARTED) {
match self.stream.poll_completed(false) {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => {
debug!("Error sending data: {}", err);
return Err(())
}
_ => (),
}
_ => (),
}
// deal with keep-alive
if self.tasks.is_empty() {
// no keep-alive situations
if self.flags.contains(Flags::ERROR)
if (self.flags.contains(Flags::ERROR)
|| !self.flags.contains(Flags::KEEPALIVE)
|| !self.settings.keep_alive_enabled()
|| !self.settings.keep_alive_enabled()) &&
self.flags.contains(Flags::STARTED)
{
return Ok(Async::Ready(false))
}

View File

@@ -41,6 +41,7 @@ pub struct HttpServer<H> where H: IntoHttpHandler + 'static
exit: bool,
shutdown_timeout: u16,
signals: Option<Addr<Syn, signal::ProcessSignals>>,
no_http2: bool,
no_signals: bool,
}
@@ -89,6 +90,7 @@ impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
exit: false,
shutdown_timeout: 30,
signals: None,
no_http2: false,
no_signals: false,
}
}
@@ -170,6 +172,12 @@ impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
self
}
/// Disable `HTTP/2` support
pub fn no_http2(mut self) -> Self {
self.no_http2 = true;
self
}
/// Get addresses of bound sockets.
pub fn addrs(&self) -> Vec<net::SocketAddr> {
self.sockets.iter().map(|s| s.0).collect()
@@ -396,15 +404,17 @@ impl<H: IntoHttpHandler> HttpServer<H>
Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound"))
} else {
// alpn support
builder.set_alpn_protos(b"\x02h2\x08http/1.1")?;
builder.set_alpn_select_callback(|_, protos| {
const H2: &[u8] = b"\x02h2";
if protos.windows(3).any(|window| window == H2) {
Ok(b"h2")
} else {
Err(AlpnError::NOACK)
}
});
if !self.no_http2 {
builder.set_alpn_protos(b"\x02h2\x08http/1.1")?;
builder.set_alpn_select_callback(|_, protos| {
const H2: &[u8] = b"\x02h2";
if protos.windows(3).any(|window| window == H2) {
Ok(b"h2")
} else {
Err(AlpnError::NOACK)
}
});
}
let (tx, rx) = mpsc::unbounded();
let acceptor = builder.build();
@@ -734,6 +744,13 @@ fn start_accept_thread(
workers[next].0, info.clone()));
msg = err.into_inner();
workers.swap_remove(next);
if workers.is_empty() {
error!("No workers");
thread::sleep(sleep);
break
} else if workers.len() <= next {
next = 0;
}
continue
}
}

View File

@@ -204,7 +204,8 @@ impl StreamHandlerType {
} else {
false
};
Arbiter::handle().spawn(HttpChannel::new(h, io, peer, http2));
Arbiter::handle().spawn(
HttpChannel::new(h, io, peer, http2));
},
Err(err) =>
trace!("Error during handling tls connection: {}", err),

View File

@@ -87,9 +87,12 @@ impl TestServer {
let sys = System::new("actix-test-server");
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = tcp.local_addr().unwrap();
let tcp = TcpListener::from_listener(tcp, &local_addr, Arbiter::handle()).unwrap();
let tcp = TcpListener::from_listener(
tcp, &local_addr, Arbiter::handle()).unwrap();
HttpServer::new(factory).disable_signals().start_incoming(tcp.incoming(), false);
HttpServer::new(factory)
.disable_signals()
.start_incoming(tcp.incoming(), false);
tx.send((Arbiter::system(), local_addr)).unwrap();
let _ = sys.run();

View File

@@ -66,6 +66,21 @@ fn test_simple() {
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
}
#[test]
fn test_with_query_parameter() {
let mut srv = test::TestServer::new(
|app| app.handler(|req: HttpRequest| match req.query().get("qp") {
Some(_) => httpcodes::HTTPOk.build().finish(),
None => httpcodes::HTTPBadRequest.build().finish(),
}));
let request = srv.get().uri(srv.url("/?qp=5").as_str()).finish().unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
}
#[test]
fn test_no_decompress() {
let mut srv = test::TestServer::new(

View File

@@ -743,7 +743,7 @@ fn test_h2() {
})
});
let _res = core.run(tcp);
// assert_eq!(_res.unwrap(), Bytes::from_static(STR.as_ref()));
// assert_eq!(res.unwrap(), Bytes::from_static(STR.as_ref()));
}
#[test]