1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-07-25 02:05:37 +02:00

Compare commits

...

34 Commits

Author SHA1 Message Date
Nikolay Kim
2227120ae0 exclude examples 2018-01-21 09:09:19 -08:00
Nikolay Kim
21c8c0371d travis config 2018-01-21 08:50:29 -08:00
Nikolay Kim
1914a6a0d8 Always enable content encoding if encoding explicitly selected 2018-01-21 08:31:46 -08:00
Nikolay Kim
1cff4619e7 reduce threshold for content encoding 2018-01-21 08:12:32 -08:00
Nikolay Kim
7bb7adf89c relax InternalError constraints 2018-01-20 22:02:42 -08:00
Nikolay Kim
f55ff24925 fix guide example 2018-01-20 21:40:18 -08:00
Nikolay Kim
f5f78d79e6 update doc strings 2018-01-20 21:16:31 -08:00
Nikolay Kim
9180625dfd refactor helper error types 2018-01-20 21:11:46 -08:00
Nikolay Kim
552320bae2 add error logging guide section 2018-01-20 20:21:01 -08:00
Nikolay Kim
7cf221f767 Log request processing errors 2018-01-20 20:12:24 -08:00
Nikolay Kim
98931a8623 test case for broken transfer encoding 2018-01-20 16:51:18 -08:00
Nikolay Kim
ae10a89014 use ws masking from tungstenite project 2018-01-20 16:47:34 -08:00
Nikolay Kim
71d534dadb CORS middleware: allowed_headers is defaulting to None #50 2018-01-20 16:36:57 -08:00
Nikolay Kim
867bb1d409 Merge branch 'master' of github.com:actix/actix-web 2018-01-20 16:12:51 -08:00
Nikolay Kim
91c44a1cf1 Fix HEAD requests handling 2018-01-20 16:12:38 -08:00
Nikolay Kim
3bc60a8d5d Merge pull request #53 from andreevlex/spelling-check-2
spelling check
2018-01-16 12:07:58 -08:00
Alexander Andreev
58df8fa4b9 spelling check 2018-01-16 21:59:33 +03:00
Nikolay Kim
81f92b43e5 Merge pull request #52 from andreevlex/spelling-check
spelling check
2018-01-15 14:16:54 -08:00
Alexander Andreev
e1d9c3803b spelling check 2018-01-16 00:47:25 +03:00
Nikolay Kim
a7c24aace1 flush is useless 2018-01-14 19:28:34 -08:00
Nikolay Kim
89a89e7b18 refactor shared bytes api 2018-01-14 17:00:28 -08:00
Nikolay Kim
3425f7be40 fix tests 2018-01-14 14:58:58 -08:00
Nikolay Kim
09a6f8a34f disable alpn feature 2018-01-14 14:44:32 -08:00
Nikolay Kim
7060f298b4 use more binary 2018-01-14 14:40:39 -08:00
Nikolay Kim
33dbe15760 use Binary for writer trait 2018-01-14 13:50:38 -08:00
Nikolay Kim
e95c7dfc29 use local actix-web for examples 2018-01-13 19:04:07 -08:00
Nikolay Kim
927a92fcac impl HttpHandler for Box<HttpHandler> and add helper method Application::boxed() #49 2018-01-13 18:58:17 -08:00
Nikolay Kim
2b0f3d2a9a prepare release 2018-01-13 16:57:01 -08:00
Nikolay Kim
93fdb596d4 Allow to explicitly disable chunked encoding 2018-01-13 16:17:33 -08:00
Nikolay Kim
305666067e Do not enable chunked encoding for HTTP/1.0 2018-01-13 12:46:43 -08:00
Nikolay Kim
b805d87ee7 no need for custom cookie module 2018-01-13 11:33:42 -08:00
Nikolay Kim
bc6bb9984f user guide spelling 2018-01-13 11:17:48 -08:00
Nikolay Kim
c043fd7912 Merge pull request #47 from belltoy/master
fix directory entry path
2018-01-13 11:16:53 -08:00
belltoy
781282897a fix directory entry path 2018-01-13 08:37:27 +00:00
56 changed files with 979 additions and 595 deletions

View File

@@ -46,7 +46,9 @@ script:
cargo clean cargo clean
USE_SKEPTIC=1 cargo test --features=alpn USE_SKEPTIC=1 cargo test --features=alpn
else else
cargo test --features=alpn cargo clean
cargo test
# --features=alpn
fi fi
- | - |

View File

@@ -1,5 +1,26 @@
# Changes # Changes
## 0.3.2 (2018-01-21)
* Fix HEAD requests handling
* Log request processing errors
* Always enable content encoding if encoding explicitly selected
* Allow multiple Applications on a single server with different state #49
* CORS middleware: allowed_headers is defaulting to None #50
## 0.3.1 (2018-01-13)
* Fix directory entry path #47
* Do not enable chunked encoding for HTTP/1.0
* Allow explicitly disable chunked encoding
## 0.3.0 (2018-01-12) ## 0.3.0 (2018-01-12)

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "actix-web" name = "actix-web"
version = "0.3.0" version = "0.3.2"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix web framework" description = "Actix web framework"
readme = "README.md" readme = "README.md"
@@ -11,7 +11,8 @@ documentation = "https://docs.rs/actix-web/"
categories = ["network-programming", "asynchronous", categories = ["network-programming", "asynchronous",
"web-programming::http-server", "web-programming::websocket"] "web-programming::http-server", "web-programming::websocket"]
license = "MIT/Apache-2.0" license = "MIT/Apache-2.0"
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"] exclude = [".gitignore", ".travis.yml", ".cargo/config",
"appveyor.yml", "/examples/**"]
build = "build.rs" build = "build.rs"
[badges] [badges]
@@ -57,9 +58,6 @@ num_cpus = "1.0"
flate2 = "1.0" flate2 = "1.0"
cookie = { version="0.10", features=["percent-encode", "secure"] } cookie = { version="0.10", features=["percent-encode", "secure"] }
# ring nightly compilation bug
# cookie = { git="https://github.com/alexcrichton/cookie-rs.git", features=["percent-encode", "secure"] }
# io # io
mio = "0.6" mio = "0.6"
net2 = "0.2" net2 = "0.2"
@@ -83,7 +81,7 @@ version = "0.9"
optional = true optional = true
[dev-dependencies] [dev-dependencies]
env_logger = "0.4" env_logger = "0.5"
reqwest = "0.8" reqwest = "0.8"
skeptic = "0.13" skeptic = "0.13"
serde_derive = "1.0" serde_derive = "1.0"

View File

@@ -6,6 +6,6 @@ workspace = "../.."
[dependencies] [dependencies]
futures = "*" futures = "*"
env_logger = "0.4" env_logger = "0.5"
actix = "0.4" actix = "0.4"
actix-web = { path = "../../" } actix-web = { path="../.." }

View File

@@ -7,6 +7,7 @@ extern crate env_logger;
extern crate futures; extern crate futures;
use futures::Stream; use futures::Stream;
use std::{io, env};
use actix_web::*; use actix_web::*;
use actix_web::middleware::RequestSession; use actix_web::middleware::RequestSession;
use futures::future::{FutureResult, result}; use futures::future::{FutureResult, result};
@@ -56,17 +57,17 @@ fn index(mut req: HttpRequest) -> Result<HttpResponse> {
fn p404(req: HttpRequest) -> Result<HttpResponse> { fn p404(req: HttpRequest) -> Result<HttpResponse> {
// html // html
let html = format!(r#"<!DOCTYPE html><html><head><title>actix - basics</title><link rel="shortcut icon" type="image/x-icon" href="/favicon.ico" /></head> let html = r#"<!DOCTYPE html><html><head><title>actix - basics</title><link rel="shortcut icon" type="image/x-icon" href="/favicon.ico" /></head>
<body> <body>
<a href="index.html">back to home</a> <a href="index.html">back to home</a>
<h1>404</h1> <h1>404</h1>
</body> </body>
</html>"#); </html>"#;
// response // response
Ok(HttpResponse::build(StatusCode::NOT_FOUND) Ok(HttpResponse::build(StatusCode::NOT_FOUND)
.content_type("text/html; charset=utf-8") .content_type("text/html; charset=utf-8")
.body(&html).unwrap()) .body(html).unwrap())
} }
@@ -92,8 +93,9 @@ fn with_param(req: HttpRequest) -> Result<HttpResponse>
} }
fn main() { fn main() {
::std::env::set_var("RUST_LOG", "actix_web=info"); env::set_var("RUST_LOG", "actix_web=debug");
let _ = env_logger::init(); env::set_var("RUST_BACKTRACE", "1");
env_logger::init();
let sys = actix::System::new("basic-example"); let sys = actix::System::new("basic-example");
let addr = HttpServer::new( let addr = HttpServer::new(
@@ -121,6 +123,9 @@ fn main() {
_ => httpcodes::HTTPNotFound, _ => httpcodes::HTTPNotFound,
} }
})) }))
.resource("/error.html", |r| r.f(|req| {
error::ErrorBadRequest(io::Error::new(io::ErrorKind::Other, "test"))
}))
// static files // static files
.handler("/static/", fs::StaticFiles::new("../static/", true)) .handler("/static/", fs::StaticFiles::new("../static/", true))
// redirect // redirect

View File

@@ -5,9 +5,9 @@ authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../.." workspace = "../.."
[dependencies] [dependencies]
env_logger = "0.4" env_logger = "0.5"
actix = "0.4" actix = "0.4"
actix-web = { git = "https://github.com/actix/actix-web" } actix-web = { path = "../../" }
futures = "0.1" futures = "0.1"
uuid = { version = "0.5", features = ["serde", "v4"] } uuid = { version = "0.5", features = ["serde", "v4"] }

View File

@@ -8,7 +8,7 @@ use diesel::prelude::*;
use models; use models;
use schema; use schema;
/// This is db executor actor. We are going to run 3 of them in parallele. /// This is db executor actor. We are going to run 3 of them in parallel.
pub struct DbExecutor(pub SqliteConnection); pub struct DbExecutor(pub SqliteConnection);
/// This is only message that this actor can handle, but it is easy to extend number of /// This is only message that this actor can handle, but it is easy to extend number of

View File

@@ -5,6 +5,6 @@ authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../.." workspace = "../.."
[dependencies] [dependencies]
env_logger = "0.4" env_logger = "0.5"
actix = "0.4" actix = "0.4"
actix-web = { path = "../../" } actix-web = { path = "../../" }

View File

@@ -15,4 +15,4 @@ serde_derive = "1.0"
json = "*" json = "*"
actix = "0.4" actix = "0.4"
actix-web = { git = "https://github.com/actix/actix-web" } actix-web = { path="../../" }

View File

@@ -12,4 +12,4 @@ path = "src/main.rs"
env_logger = "*" env_logger = "*"
futures = "0.1" futures = "0.1"
actix = "0.4" actix = "0.4"
actix-web = { git = "https://github.com/actix/actix-web" } actix-web = { path="../../" }

View File

@@ -6,6 +6,6 @@ workspace = "../.."
[dependencies] [dependencies]
futures = "*" futures = "*"
env_logger = "0.4" env_logger = "0.5"
actix = "0.4" actix = "0.4"
actix-web = { git = "https://github.com/actix/actix-web" } actix-web = { path = "../../" }

View File

@@ -1,5 +1,5 @@
#![cfg_attr(feature="cargo-clippy", allow(needless_pass_by_value))] #![cfg_attr(feature="cargo-clippy", allow(needless_pass_by_value))]
//! There are two level of statfulness in actix-web. Application has state //! There are two level of statefulness in actix-web. Application has state
//! that is shared across all handlers within same Application. //! that is shared across all handlers within same Application.
//! And individual handler can have state. //! And individual handler can have state.
@@ -33,7 +33,7 @@ struct MyWebSocket {
} }
impl Actor for MyWebSocket { impl Actor for MyWebSocket {
type Context = HttpContext<Self, AppState>; type Context = ws::WebsocketContext<Self, AppState>;
} }
impl Handler<ws::Message> for MyWebSocket { impl Handler<ws::Message> for MyWebSocket {
@@ -43,9 +43,9 @@ impl Handler<ws::Message> for MyWebSocket {
self.counter += 1; self.counter += 1;
println!("WS({}): {:?}", self.counter, msg); println!("WS({}): {:?}", self.counter, msg);
match msg { match msg {
ws::Message::Ping(msg) => ws::WsWriter::pong(ctx, &msg), ws::Message::Ping(msg) => ctx.pong(&msg),
ws::Message::Text(text) => ws::WsWriter::text(ctx, &text), ws::Message::Text(text) => ctx.text(&text),
ws::Message::Binary(bin) => ws::WsWriter::binary(ctx, bin), ws::Message::Binary(bin) => ctx.binary(bin),
ws::Message::Closed | ws::Message::Error => { ws::Message::Closed | ws::Message::Error => {
ctx.stop(); ctx.stop();
} }

View File

@@ -5,7 +5,7 @@ authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../.." workspace = "../.."
[dependencies] [dependencies]
env_logger = "0.4" env_logger = "0.5"
actix = "0.4" actix = "0.4"
actix-web = { git = "https://github.com/actix/actix-web" } actix-web = { path = "../../" }
tera = "*" tera = "*"

View File

@@ -9,6 +9,6 @@ name = "server"
path = "src/main.rs" path = "src/main.rs"
[dependencies] [dependencies]
env_logger = "0.4" env_logger = "0.5"
actix = "^0.4.2" actix = "^0.4.2"
actix-web = { git = "https://github.com/actix/actix-web", features=["alpn"] } actix-web = { path = "../../", features=["alpn"] }

View File

@@ -9,7 +9,7 @@ use std::io::Read;
use actix_web::*; use actix_web::*;
/// somple handle /// simple handle
fn index(req: HttpRequest) -> Result<HttpResponse> { fn index(req: HttpRequest) -> Result<HttpResponse> {
println!("{:?}", req); println!("{:?}", req);
Ok(httpcodes::HTTPOk Ok(httpcodes::HTTPOk

View File

@@ -26,4 +26,4 @@ serde_json = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
actix = "^0.4.2" actix = "^0.4.2"
actix-web = { git = "https://github.com/actix/actix-web" } actix-web = { path="../../" }

View File

@@ -16,8 +16,8 @@ Chat server listens for incoming tcp connections. Server can access several type
* `\list` - list all available rooms * `\list` - list all available rooms
* `\join name` - join room, if room does not exist, create new one * `\join name` - join room, if room does not exist, create new one
* `\name name` - set session name * `\name name` - set session name
* `some message` - just string, send messsage to all peers in same room * `some message` - just string, send message to all peers in same room
* client has to send heartbeat `Ping` messages, if server does not receive a heartbeat message for 10 seconds connection gets droppped * client has to send heartbeat `Ping` messages, if server does not receive a heartbeat message for 10 seconds connection gets dropped
To start server use command: `cargo run --bin server` To start server use command: `cargo run --bin server`

View File

@@ -16,7 +16,7 @@ use codec::{ChatRequest, ChatResponse, ChatCodec};
#[derive(Message)] #[derive(Message)]
pub struct Message(pub String); pub struct Message(pub String);
/// `ChatSession` actor is responsible for tcp peer communitions. /// `ChatSession` actor is responsible for tcp peer communications.
pub struct ChatSession { pub struct ChatSession {
/// unique session id /// unique session id
id: usize, id: usize,
@@ -30,7 +30,7 @@ pub struct ChatSession {
impl Actor for ChatSession { impl Actor for ChatSession {
/// For tcp communication we are going to use `FramedContext`. /// For tcp communication we are going to use `FramedContext`.
/// It is convinient wrapper around `Framed` object from `tokio_io` /// It is convenient wrapper around `Framed` object from `tokio_io`
type Context = FramedContext<Self>; type Context = FramedContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) { fn started(&mut self, ctx: &mut Self::Context) {
@@ -149,7 +149,7 @@ impl ChatSession {
} }
/// Define tcp server that will accept incomint tcp connection and create /// Define tcp server that will accept incoming tcp connection and create
/// chat actors. /// chat actors.
pub struct TcpServer { pub struct TcpServer {
chat: SyncAddress<ChatServer>, chat: SyncAddress<ChatServer>,

View File

@@ -17,7 +17,7 @@ If you already have rustup installed, run this command to ensure you have the la
rustup update rustup update
``` ```
Actix web framework requies rust version 1.20 and up. Actix web framework requires rust version 1.20 and up.
## Running Examples ## Running Examples

View File

@@ -1,7 +1,7 @@
# Middlewares # Middlewares
Actix middlewares system allows to add additional behaviour to request/response processing. Actix middlewares system allows to add additional behavior to request/response processing.
Middleware can hook into incomnig request process and modify request or halt request Middleware can hook into incoming request process and modify request or halt request
processing and return response early. Also it can hook into response processing. processing and return response early. Also it can hook into response processing.
Typically middlewares involves in following actions: Typically middlewares involves in following actions:
@@ -12,9 +12,9 @@ Typically middlewares involves in following actions:
* Access external services (redis, logging, sessions) * Access external services (redis, logging, sessions)
Middlewares are registered for each application and get executed in same order as Middlewares are registered for each application and get executed in same order as
registraton order. In general, *middleware* is a type that implements registration order. In general, *middleware* is a type that implements
[*Middleware trait*](../actix_web/middlewares/trait.Middleware.html). Each method [*Middleware trait*](../actix_web/middlewares/trait.Middleware.html). Each method
in this trait has default implementation. Each method can return result immidietly in this trait has default implementation. Each method can return result immediately
or *future* object. or *future* object.
Here is example of simple middleware that adds request and response headers: Here is example of simple middleware that adds request and response headers:
@@ -148,7 +148,7 @@ fn main() {
## User sessions ## User sessions
Actix provides general solution for session management. Actix provides general solution for session management.
[*Session storage*](../actix_web/middleware/struct.SessionStorage.html) middleare can be [*Session storage*](../actix_web/middleware/struct.SessionStorage.html) middleware can be
use with different backend types to store session data in different backends. use with different backend types to store session data in different backends.
By default only cookie session backend is implemented. Other backend implementations By default only cookie session backend is implemented. Other backend implementations
could be added later. could be added later.
@@ -162,7 +162,7 @@ You need to pass a random value to the constructor of *CookieSessionBackend*.
This is private key for cookie session. When this value is changed, all session data is lost. This is private key for cookie session. When this value is changed, all session data is lost.
Note that whatever you write into your session is visible by the user (but not modifiable). Note that whatever you write into your session is visible by the user (but not modifiable).
In general case, you cretate In general case, you create
[*Session storage*](../actix_web/middleware/struct.SessionStorage.html) middleware [*Session storage*](../actix_web/middleware/struct.SessionStorage.html) middleware
and initializes it with specific backend implementation, like *CookieSessionBackend*. and initializes it with specific backend implementation, like *CookieSessionBackend*.
To access session data To access session data

View File

@@ -4,7 +4,7 @@ Actix web automatically upgrades connection to *HTTP/2.0* if possible.
## Negotiation ## Negotiation
*HTTP/2.0* protocol over tls without prior knowlage requires *HTTP/2.0* protocol over tls without prior knowledge requires
[tls alpn](https://tools.ietf.org/html/rfc7301). At the moment only [tls alpn](https://tools.ietf.org/html/rfc7301). At the moment only
`rust-openssl` has support. Turn on `alpn` feature to enable `alpn` negotiation. `rust-openssl` has support. Turn on `alpn` feature to enable `alpn` negotiation.
With enable `alpn` feature `HttpServer` provides With enable `alpn` feature `HttpServer` provides

View File

@@ -36,8 +36,9 @@ We can send `CreateUser` message to `DbExecutor` actor, and as result we get
```rust,ignore ```rust,ignore
impl Handler<CreateUser> for DbExecutor { impl Handler<CreateUser> for DbExecutor {
type Result = Result<User, Error>
fn handle(&mut self, msg: CreateUser, _: &mut Self::Context) -> Response<Self, CreateUser> fn handle(&mut self, msg: CreateUser, _: &mut Self::Context) -> Self::Result
{ {
use self::schema::users::dsl::*; use self::schema::users::dsl::*;
@@ -59,7 +60,7 @@ impl Handler<CreateUser> for DbExecutor {
.load::<models::User>(&self.0) .load::<models::User>(&self.0)
.expect("Error loading person"); .expect("Error loading person");
Self::reply(items.pop().unwrap()) Ok(items.pop().unwrap())
} }
} }
``` ```
@@ -77,7 +78,7 @@ struct State {
fn main() { fn main() {
let sys = actix::System::new("diesel-example"); let sys = actix::System::new("diesel-example");
// Start 3 parallele db executors // Start 3 parallel db executors
let addr = SyncArbiter::start(3, || { let addr = SyncArbiter::start(3, || {
DbExecutor(SqliteConnection::establish("test.db").unwrap()) DbExecutor(SqliteConnection::establish("test.db").unwrap())
}); });
@@ -94,7 +95,7 @@ fn main() {
} }
``` ```
And finally we can use address in a requst handler. We get message response And finally we can use address in a request handler. We get message response
asynchronously, so handler needs to return future object, also `Route::a()` needs to be asynchronously, so handler needs to return future object, also `Route::a()` needs to be
used for async handler registration. used for async handler registration.

View File

@@ -2,7 +2,7 @@
Actix web provides some primitives to build web servers and applications with Rust. Actix web provides some primitives to build web servers and applications with Rust.
It provides routing, middlewares, pre-processing of requests, and post-processing of responses, It provides routing, middlewares, pre-processing of requests, and post-processing of responses,
websocket protcol handling, multipart streams, etc. websocket protocol handling, multipart streams, etc.
All actix web server is built around `Application` instance. All actix web server is built around `Application` instance.
It is used for registering routes for resources, middlewares. It is used for registering routes for resources, middlewares.
@@ -10,9 +10,9 @@ Also it stores application specific state that is shared across all handlers
within same application. within same application.
Application acts as namespace for all routes, i.e all routes for specific application Application acts as namespace for all routes, i.e all routes for specific application
has same url path prefix. Application prefix always contains laading "/" slash. has same url path prefix. Application prefix always contains leading "/" slash.
If supplied prefix does not contain leading slash, it get inserted. If supplied prefix does not contain leading slash, it get inserted.
Prefix should consists of valud path segments. i.e for application with prefix `/app` Prefix should consists of value path segments. i.e for application with prefix `/app`
any request with following paths `/app`, `/app/` or `/app/test` would match, any request with following paths `/app`, `/app/` or `/app/test` would match,
but path `/application` would not match. but path `/application` would not match.

View File

@@ -2,7 +2,7 @@
[*HttpServer*](../actix_web/struct.HttpServer.html) type is responsible for [*HttpServer*](../actix_web/struct.HttpServer.html) type is responsible for
serving http requests. *HttpServer* accept application factory as a parameter, serving http requests. *HttpServer* accept application factory as a parameter,
Application factory must have `Send` + `Sync` bounderies. More about that in Application factory must have `Send` + `Sync` boundaries. More about that in
*multi-threading* section. To bind to specific socket address `bind()` must be used. *multi-threading* section. To bind to specific socket address `bind()` must be used.
This method could be called multiple times. To start http server one of the *start* This method could be called multiple times. To start http server one of the *start*
methods could be used. `start()` method start simple server, `start_tls()` or `start_ssl()` methods could be used. `start()` method start simple server, `start_tls()` or `start_ssl()`

View File

@@ -5,7 +5,7 @@ and [`ResponseError` trait](../actix_web/error/trait.ResponseError.html)
for handling handler's errors. for handling handler's errors.
Any error that implements `ResponseError` trait can be returned as error value. Any error that implements `ResponseError` trait can be returned as error value.
*Handler* can return *Result* object, actix by default provides *Handler* can return *Result* object, actix by default provides
`Responder` implemenation for compatible result object. Here is implementation `Responder` implementation for compatible result object. Here is implementation
definition: definition:
```rust,ignore ```rust,ignore
@@ -134,3 +134,18 @@ fn index(req: HttpRequest) -> Result<&'static str> {
``` ```
In this example *BAD REQUEST* response get generated for `MyError` error. In this example *BAD REQUEST* response get generated for `MyError` error.
## Error logging
Actix logs all errors with `WARN` log level. If log level set to `DEBUG`
and `RUST_BACKTRACE` is enabled, backtrace get logged. The Error type uses
cause's error backtrace if available, if the underlying failure does not provide
a backtrace, a new backtrace is constructed pointing to that conversion point
(rather than the origin of the error). This construction only happens if there
is no underlying backtrace; if it does have a backtrace no new backtrace is constructed.
You can enable backtrace and debug logging with following command:
```
>> RUST_BACKTRACE=1 RUST_LOG=actix_web=debug cargo run
```

View File

@@ -2,15 +2,15 @@
URL dispatch provides a simple way to map URLs to `Handler` code using a simple pattern matching URL dispatch provides a simple way to map URLs to `Handler` code using a simple pattern matching
language. *Regex* crate and it's language. *Regex* crate and it's
[*RegexSet*](https://doc.rust-lang.org/regex/regex/struct.RegexSet.html) is beeing used for [*RegexSet*](https://doc.rust-lang.org/regex/regex/struct.RegexSet.html) is being used for
pattern matching. If one of the patterns matches the path information associated with a request, pattern matching. If one of the patterns matches the path information associated with a request,
a particular handler object is invoked. A handler is a specific object that implements a particular handler object is invoked. A handler is a specific object that implements
`Handler` trait, defined in your application, that receives the request and returns `Handler` trait, defined in your application, that receives the request and returns
a response object. More informatin is available in [handler section](../qs_4.html). a response object. More information is available in [handler section](../qs_4.html).
## Resource configuration ## Resource configuration
Resource configuraiton is the act of adding a new resource to an application. Resource configuration is the act of adding a new resource to an application.
A resource has a name, which acts as an identifier to be used for URL generation. A resource has a name, which acts as an identifier to be used for URL generation.
The name also allows developers to add routes to existing resources. The name also allows developers to add routes to existing resources.
A resource also has a pattern, meant to match against the *PATH* portion of a *URL*, A resource also has a pattern, meant to match against the *PATH* portion of a *URL*,
@@ -19,7 +19,7 @@ port, e.g., */foo/bar* in the *URL* *http://localhost:8080/foo/bar?q=value*).
The [Application::resource](../actix_web/struct.Application.html#method.resource) methods The [Application::resource](../actix_web/struct.Application.html#method.resource) methods
add a single resource to application routing table. This method accepts *path pattern* add a single resource to application routing table. This method accepts *path pattern*
and resource configuration funnction. and resource configuration function.
```rust ```rust
# extern crate actix_web; # extern crate actix_web;
@@ -39,20 +39,20 @@ fn main() {
} }
``` ```
*Configuraiton function* has following type: *Configuration function* has following type:
```rust,ignore ```rust,ignore
FnOnce(&mut Resource<_>) -> () FnOnce(&mut Resource<_>) -> ()
``` ```
*Configration function* can set name and register specific routes. *Configuration function* can set name and register specific routes.
If resource does not contain any route or does not have any matching routes it If resource does not contain any route or does not have any matching routes it
returns *NOT FOUND* http resources. returns *NOT FOUND* http resources.
## Configuring a Route ## Configuring a Route
Resource contains set of routes. Each route in turn has set of predicates and handler. Resource contains set of routes. Each route in turn has set of predicates and handler.
New route could be crearted with `Resource::route()` method which returns reference New route could be created with `Resource::route()` method which returns reference
to new *Route* instance. By default *route* does not contain any predicates, so matches to new *Route* instance. By default *route* does not contain any predicates, so matches
all requests and default handler is `HTTPNotFound`. all requests and default handler is `HTTPNotFound`.
@@ -91,17 +91,17 @@ builder-like pattern. Following configuration methods are available:
any number of predicates could be registered for each route. any number of predicates could be registered for each route.
* [*Route::f()*](../actix_web/struct.Route.html#method.f) method registers handler function * [*Route::f()*](../actix_web/struct.Route.html#method.f) method registers handler function
for this route. Only one handler could be registered. Usually handler registeration for this route. Only one handler could be registered. Usually handler registration
is the last config operation. Handler fanction could be function or closure and has type is the last config operation. Handler function could be function or closure and has type
`Fn(HttpRequest<S>) -> R + 'static` `Fn(HttpRequest<S>) -> R + 'static`
* [*Route::h()*](../actix_web/struct.Route.html#method.h) method registers handler object * [*Route::h()*](../actix_web/struct.Route.html#method.h) method registers handler object
that implements `Handler` trait. This is similar to `f()` method, only one handler could that implements `Handler` trait. This is similar to `f()` method, only one handler could
be registered. Handler registeration is the last config operation. be registered. Handler registration is the last config operation.
* [*Route::a()*](../actix_web/struct.Route.html#method.a) method registers asynchandler * [*Route::a()*](../actix_web/struct.Route.html#method.a) method registers async handler
function for this route. Only one handler could be registered. Handler registeration function for this route. Only one handler could be registered. Handler registration
is the last config operation. Handler fanction could be function or closure and has type is the last config operation. Handler function could be function or closure and has type
`Fn(HttpRequest<S>) -> Future<Item = HttpResponse, Error = Error> + 'static` `Fn(HttpRequest<S>) -> Future<Item = HttpResponse, Error = Error> + 'static`
## Route matching ## Route matching
@@ -112,7 +112,7 @@ against a URL path pattern. `path` represents the path portion of the URL that w
The way that *actix* does this is very simple. When a request enters the system, The way that *actix* does this is very simple. When a request enters the system,
for each resource configuration registration present in the system, actix checks for each resource configuration registration present in the system, actix checks
the request's path against the pattern declared. *Regex* crate and it's the request's path against the pattern declared. *Regex* crate and it's
[*RegexSet*](https://doc.rust-lang.org/regex/regex/struct.RegexSet.html) is beeing used for [*RegexSet*](https://doc.rust-lang.org/regex/regex/struct.RegexSet.html) is being used for
pattern matching. If resource could not be found, *default resource* get used as matched pattern matching. If resource could not be found, *default resource* get used as matched
resource. resource.
@@ -516,7 +516,7 @@ Predicates can have access to application's state via `HttpRequest::state()` met
Also predicates can store extra information in Also predicates can store extra information in
[requests`s extensions](../actix_web/struct.HttpRequest.html#method.extensions). [requests`s extensions](../actix_web/struct.HttpRequest.html#method.extensions).
### Modifing predicate values ### Modifying predicate values
You can invert the meaning of any predicate value by wrapping it in a `Not` predicate. You can invert the meaning of any predicate value by wrapping it in a `Not` predicate.
For example if you want to return "METHOD NOT ALLOWED" response for all methods For example if you want to return "METHOD NOT ALLOWED" response for all methods

View File

@@ -4,7 +4,7 @@
Builder-like patter is used to construct an instance of `HttpResponse`. Builder-like patter is used to construct an instance of `HttpResponse`.
`HttpResponse` provides several method that returns `HttpResponseBuilder` instance, `HttpResponse` provides several method that returns `HttpResponseBuilder` instance,
which is implements various convinience methods that helps build response. which is implements various convenience methods that helps build response.
Check [documentation](../actix_web/dev/struct.HttpResponseBuilder.html) Check [documentation](../actix_web/dev/struct.HttpResponseBuilder.html)
for type description. Methods `.body`, `.finish`, `.json` finalizes response creation and for type description. Methods `.body`, `.finish`, `.json` finalizes response creation and
returns constructed *HttpResponse* instance. if this methods get called for the same returns constructed *HttpResponse* instance. if this methods get called for the same
@@ -91,7 +91,7 @@ fn index(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> {
# fn main() {} # fn main() {}
``` ```
Or you can manually load payload into memory and ther deserialize it. Or you can manually load payload into memory and then deserialize it.
Here is simple example. We will deserialize *MyObj* struct. We need to load request Here is simple example. We will deserialize *MyObj* struct. We need to load request
body first and then deserialize json into object. body first and then deserialize json into object.
@@ -200,7 +200,7 @@ fn index(req: HttpRequest) -> Box<Future<...>> {
match item { match item {
// Handle multipart Field // Handle multipart Field
multipart::MultipartItem::Field(field) => { multipart::MultipartItem::Field(field) => {
println!("==== FIELD ==== {:?} {:?}", field.heders(), field.content_type()); println!("==== FIELD ==== {:?} {:?}", field.headers(), field.content_type());
Either::A( Either::A(
// Field in turn is a stream of *Bytes* objects // Field in turn is a stream of *Bytes* objects
@@ -259,7 +259,7 @@ fn index(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> {
Actix uses [*Payload*](../actix_web/payload/struct.Payload.html) object as request payload stream. Actix uses [*Payload*](../actix_web/payload/struct.Payload.html) object as request payload stream.
*HttpRequest* provides several methods, which can be used for payload access. *HttpRequest* provides several methods, which can be used for payload access.
At the same time *Payload* implements *Stream* trait, so it could be used with various At the same time *Payload* implements *Stream* trait, so it could be used with various
stream combinators. Also *Payload* provides serveral convinience methods that return stream combinators. Also *Payload* provides several convenience methods that return
future object that resolve to Bytes object. future object that resolve to Bytes object.
* *readany()* method returns *Stream* of *Bytes* objects. * *readany()* method returns *Stream* of *Bytes* objects.
@@ -283,7 +283,7 @@ use futures::{Future, Stream};
fn index(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> { fn index(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> {
req.payload_mut() req.payload()
.readany() .readany()
.from_err() .from_err()
.fold((), |_, chunk| { .fold((), |_, chunk| {

View File

@@ -3,7 +3,7 @@
Actix supports WebSockets out-of-the-box. It is possible to convert request's `Payload` Actix supports WebSockets out-of-the-box. It is possible to convert request's `Payload`
to a stream of [*ws::Message*](../actix_web/ws/enum.Message.html) with to a stream of [*ws::Message*](../actix_web/ws/enum.Message.html) with
a [*ws::WsStream*](../actix_web/ws/struct.WsStream.html) and then use stream a [*ws::WsStream*](../actix_web/ws/struct.WsStream.html) and then use stream
combinators to handle actual messages. But it is simplier to handle websocket communications combinators to handle actual messages. But it is simpler to handle websocket communications
with http actor. with http actor.
This is example of simple websocket echo server: This is example of simple websocket echo server:

View File

@@ -59,9 +59,11 @@ impl<S: 'static> PipelineHandler<S> for Inner<S> {
#[cfg(test)] #[cfg(test)]
impl<S: 'static> HttpApplication<S> { impl<S: 'static> HttpApplication<S> {
#[cfg(test)]
pub(crate) fn run(&mut self, req: HttpRequest<S>) -> Reply { pub(crate) fn run(&mut self, req: HttpRequest<S>) -> Reply {
self.inner.borrow_mut().handle(req) self.inner.borrow_mut().handle(req)
} }
#[cfg(test)]
pub(crate) fn prepare_request(&self, req: HttpRequest) -> HttpRequest<S> { pub(crate) fn prepare_request(&self, req: HttpRequest) -> HttpRequest<S> {
req.with_state(Rc::clone(&self.state), self.router.clone()) req.with_state(Rc::clone(&self.state), self.router.clone())
} }
@@ -134,7 +136,7 @@ impl<S> Application<S> where S: 'static {
/// Create application with specific state. Application can be /// Create application with specific state. Application can be
/// configured with builder-like pattern. /// configured with builder-like pattern.
/// ///
/// State is shared with all reousrces within same application and could be /// State is shared with all resources within same application and could be
/// accessed with `HttpRequest::state()` method. /// accessed with `HttpRequest::state()` method.
pub fn with_state(state: S) -> Application<S> { pub fn with_state(state: S) -> Application<S> {
Application { Application {
@@ -154,7 +156,7 @@ impl<S> Application<S> where S: 'static {
/// Set application prefix /// Set application prefix
/// ///
/// Only requests that matches application's prefix get processed by this application. /// Only requests that matches application's prefix get processed by this application.
/// Application prefix always contains laading "/" slash. If supplied prefix /// Application prefix always contains leading "/" slash. If supplied prefix
/// does not contain leading slash, it get inserted. Prefix should /// does not contain leading slash, it get inserted. Prefix should
/// consists valid path segments. i.e for application with /// consists valid path segments. i.e for application with
/// prefix `/app` any request with following paths `/app`, `/app/` or `/app/test` /// prefix `/app` any request with following paths `/app`, `/app/` or `/app/test`
@@ -356,6 +358,40 @@ impl<S> Application<S> where S: 'static {
middlewares: Rc::new(parts.middlewares), middlewares: Rc::new(parts.middlewares),
} }
} }
/// Convenience method for creating `Box<HttpHandler>` instance.
///
/// This method is useful if you need to register several application instances
/// with different state.
///
/// ```rust
/// # use std::thread;
/// # extern crate actix_web;
/// use actix_web::*;
///
/// struct State1;
///
/// struct State2;
///
/// fn main() {
/// # thread::spawn(|| {
/// HttpServer::new(|| { vec![
/// Application::with_state(State1)
/// .prefix("/app1")
/// .resource("/", |r| r.h(httpcodes::HTTPOk))
/// .boxed(),
/// Application::with_state(State2)
/// .prefix("/app2")
/// .resource("/", |r| r.h(httpcodes::HTTPOk))
/// .boxed() ]})
/// .bind("127.0.0.1:8080").unwrap()
/// .run()
/// # });
/// }
/// ```
pub fn boxed(mut self) -> Box<HttpHandler> {
Box::new(self.finish())
}
} }
impl<S: 'static> IntoHttpHandler for Application<S> { impl<S: 'static> IntoHttpHandler for Application<S> {

View File

@@ -1,4 +1,4 @@
use std::fmt; use std::{fmt, mem};
use std::rc::Rc; use std::rc::Rc;
use std::sync::Arc; use std::sync::Arc;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
@@ -31,7 +31,7 @@ pub enum Binary {
Bytes(Bytes), Bytes(Bytes),
/// Static slice /// Static slice
Slice(&'static [u8]), Slice(&'static [u8]),
/// Shared stirng body /// Shared string body
SharedString(Rc<String>), SharedString(Rc<String>),
/// Shared string body /// Shared string body
#[doc(hidden)] #[doc(hidden)]
@@ -122,6 +122,22 @@ impl Binary {
pub fn from_slice(s: &[u8]) -> Binary { pub fn from_slice(s: &[u8]) -> Binary {
Binary::Bytes(Bytes::from(s)) Binary::Bytes(Bytes::from(s))
} }
/// Convert Binary to a Bytes instance
pub fn take(&mut self) -> Bytes {
mem::replace(self, Binary::Slice(b"")).into()
}
}
impl Clone for Binary {
fn clone(&self) -> Binary {
match *self {
Binary::Bytes(ref bytes) => Binary::Bytes(bytes.clone()),
Binary::Slice(slice) => Binary::Bytes(Bytes::from(slice)),
Binary::SharedString(ref s) => Binary::Bytes(Bytes::from(s.as_str())),
Binary::ArcSharedString(ref s) => Binary::Bytes(Bytes::from(s.as_str())),
}
}
} }
impl Into<Bytes> for Binary { impl Into<Bytes> for Binary {

View File

@@ -1,9 +1,9 @@
use std; use std;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::collections::VecDeque;
use futures::{Async, Future, Poll}; use futures::{Async, Future, Poll};
use futures::sync::oneshot::Sender; use futures::sync::oneshot::Sender;
use futures::unsync::oneshot; use futures::unsync::oneshot;
use smallvec::SmallVec;
use actix::{Actor, ActorState, ActorContext, AsyncContext, use actix::{Actor, ActorState, ActorContext, AsyncContext,
Address, SyncAddress, Handler, Subscriber, ResponseType, SpawnHandle}; Address, SyncAddress, Handler, Subscriber, ResponseType, SpawnHandle};
@@ -18,7 +18,7 @@ use httprequest::HttpRequest;
pub trait ActorHttpContext: 'static { pub trait ActorHttpContext: 'static {
fn disconnected(&mut self); fn disconnected(&mut self);
fn poll(&mut self) -> Poll<Option<Frame>, Error>; fn poll(&mut self) -> Poll<Option<SmallVec<[Frame; 2]>>, Error>;
} }
#[derive(Debug)] #[derive(Debug)]
@@ -31,7 +31,7 @@ pub enum Frame {
pub struct HttpContext<A, S=()> where A: Actor<Context=HttpContext<A, S>>, pub struct HttpContext<A, S=()> where A: Actor<Context=HttpContext<A, S>>,
{ {
inner: ContextImpl<A>, inner: ContextImpl<A>,
stream: VecDeque<Frame>, stream: Option<SmallVec<[Frame; 2]>>,
request: HttpRequest<S>, request: HttpRequest<S>,
disconnected: bool, disconnected: bool,
} }
@@ -91,7 +91,7 @@ impl<A, S: 'static> HttpContext<A, S> where A: Actor<Context=Self> {
pub fn from_request(req: HttpRequest<S>) -> HttpContext<A, S> { pub fn from_request(req: HttpRequest<S>) -> HttpContext<A, S> {
HttpContext { HttpContext {
inner: ContextImpl::new(None), inner: ContextImpl::new(None),
stream: VecDeque::new(), stream: None,
request: req, request: req,
disconnected: false, disconnected: false,
} }
@@ -121,23 +121,23 @@ impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
#[inline] #[inline]
pub fn write<B: Into<Binary>>(&mut self, data: B) { pub fn write<B: Into<Binary>>(&mut self, data: B) {
if !self.disconnected { if !self.disconnected {
self.stream.push_back(Frame::Chunk(Some(data.into()))); self.add_frame(Frame::Chunk(Some(data.into())));
} else { } else {
warn!("Trying to write to disconnected response"); warn!("Trying to write to disconnected response");
} }
} }
/// Indicate end of streamimng payload. Also this method calls `Self::close`. /// Indicate end of streaming payload. Also this method calls `Self::close`.
#[inline] #[inline]
pub fn write_eof(&mut self) { pub fn write_eof(&mut self) {
self.stream.push_back(Frame::Chunk(None)); self.add_frame(Frame::Chunk(None));
} }
/// Returns drain future /// Returns drain future
pub fn drain(&mut self) -> Drain<A> { pub fn drain(&mut self) -> Drain<A> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.inner.modify(); self.inner.modify();
self.stream.push_back(Frame::Drain(tx)); self.add_frame(Frame::Drain(tx));
Drain::new(rx) Drain::new(rx)
} }
@@ -146,6 +146,14 @@ impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
pub fn connected(&self) -> bool { pub fn connected(&self) -> bool {
!self.disconnected !self.disconnected
} }
#[inline]
fn add_frame(&mut self, frame: Frame) {
if self.stream.is_none() {
self.stream = Some(SmallVec::new());
}
self.stream.as_mut().map(|s| s.push(frame));
}
} }
impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> { impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
@@ -176,7 +184,7 @@ impl<A, S> ActorHttpContext for HttpContext<A, S> where A: Actor<Context=Self>,
self.stop(); self.stop();
} }
fn poll(&mut self) -> Poll<Option<Frame>, Error> { fn poll(&mut self) -> Poll<Option<SmallVec<[Frame; 2]>>, Error> {
let ctx: &mut HttpContext<A, S> = unsafe { let ctx: &mut HttpContext<A, S> = unsafe {
std::mem::transmute(self as &mut HttpContext<A, S>) std::mem::transmute(self as &mut HttpContext<A, S>)
}; };
@@ -189,8 +197,8 @@ impl<A, S> ActorHttpContext for HttpContext<A, S> where A: Actor<Context=Self>,
} }
// frames // frames
if let Some(frame) = self.stream.pop_front() { if let Some(data) = self.stream.take() {
Ok(Async::Ready(Some(frame))) Ok(Async::Ready(Some(data)))
} else if self.inner.alive() { } else if self.inner.alive() {
Ok(Async::NotReady) Ok(Async::NotReady)
} else { } else {

View File

@@ -9,8 +9,8 @@ use std::error::Error as StdError;
use cookie; use cookie;
use httparse; use httparse;
use failure::Fail;
use futures::Canceled; use futures::Canceled;
use failure::{Fail, Backtrace};
use http2::Error as Http2Error; use http2::Error as Http2Error;
use http::{header, StatusCode, Error as HttpError}; use http::{header, StatusCode, Error as HttpError};
use http::uri::InvalidUriBytes; use http::uri::InvalidUriBytes;
@@ -22,6 +22,8 @@ use url::ParseError as UrlParseError;
pub use cookie::{ParseError as CookieParseError}; pub use cookie::{ParseError as CookieParseError};
use body::Body; use body::Body;
use handler::Responder;
use httprequest::HttpRequest;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use httpcodes::{HTTPBadRequest, HTTPMethodNotAllowed, HTTPExpectationFailed}; use httpcodes::{HTTPBadRequest, HTTPMethodNotAllowed, HTTPExpectationFailed};
@@ -33,9 +35,9 @@ use httpcodes::{HTTPBadRequest, HTTPMethodNotAllowed, HTTPExpectationFailed};
pub type Result<T, E=Error> = result::Result<T, E>; pub type Result<T, E=Error> = result::Result<T, E>;
/// General purpose actix web error /// General purpose actix web error
#[derive(Fail, Debug)]
pub struct Error { pub struct Error {
cause: Box<ResponseError>, cause: Box<ResponseError>,
backtrace: Option<Backtrace>,
} }
impl Error { impl Error {
@@ -64,6 +66,16 @@ impl fmt::Display for Error {
} }
} }
impl fmt::Debug for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if let Some(bt) = self.cause.backtrace() {
write!(f, "{:?}\n\n{:?}", &self.cause, bt)
} else {
write!(f, "{:?}\n\n{:?}", &self.cause, self.backtrace.as_ref().unwrap())
}
}
}
/// `HttpResponse` for `Error` /// `HttpResponse` for `Error`
impl From<Error> for HttpResponse { impl From<Error> for HttpResponse {
fn from(err: Error) -> Self { fn from(err: Error) -> Self {
@@ -74,7 +86,12 @@ impl From<Error> for HttpResponse {
/// `Error` for any error that implements `ResponseError` /// `Error` for any error that implements `ResponseError`
impl<T: ResponseError> From<T> for Error { impl<T: ResponseError> From<T> for Error {
fn from(err: T) -> Error { fn from(err: T) -> Error {
Error { cause: Box::new(err) } let backtrace = if err.backtrace().is_none() {
Some(Backtrace::new())
} else {
None
};
Error { cause: Box::new(err), backtrace: backtrace }
} }
} }
@@ -320,7 +337,7 @@ pub enum WsHandshakeError {
/// Only get method is allowed /// Only get method is allowed
#[fail(display="Method not allowed")] #[fail(display="Method not allowed")]
GetMethodRequired, GetMethodRequired,
/// Ugrade header if not set to websocket /// Upgrade header if not set to websocket
#[fail(display="Websocket upgrade is expected")] #[fail(display="Websocket upgrade is expected")]
NoWebsocketUpgrade, NoWebsocketUpgrade,
/// Connection header is not set to upgrade /// Connection header is not set to upgrade
@@ -329,7 +346,7 @@ pub enum WsHandshakeError {
/// Websocket version header is not set /// Websocket version header is not set
#[fail(display="Websocket version header is required")] #[fail(display="Websocket version header is required")]
NoVersionHeader, NoVersionHeader,
/// Unsupported websockt version /// Unsupported websocket version
#[fail(display="Unsupported version")] #[fail(display="Unsupported version")]
UnsupportedVersion, UnsupportedVersion,
/// Websocket key is not set or wrong /// Websocket key is not set or wrong
@@ -478,39 +495,10 @@ impl From<UrlParseError> for UrlGenerationError {
} }
} }
macro_rules! ERROR_WRAP { /// Helper type that can wrap any error and generate custom response.
($type:ty, $status:expr) => {
unsafe impl<T> Sync for $type {}
unsafe impl<T> Send for $type {}
impl<T> $type {
pub fn cause(&self) -> &T {
&self.0
}
}
impl<T: fmt::Debug + 'static> Fail for $type {}
impl<T: fmt::Debug + 'static> fmt::Display for $type {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self.0)
}
}
impl<T> ResponseError for $type
where T: Send + Sync + fmt::Debug + 'static,
{
fn error_response(&self) -> HttpResponse {
HttpResponse::new($status, Body::Empty)
}
}
}
}
/// Helper type that can wrap any error and generate *BAD REQUEST* response.
/// ///
/// In following example any `io::Error` will be converted into "BAD REQUEST" response /// In following example any `io::Error` will be converted into "BAD REQUEST" response
/// as oposite to *INNTERNAL SERVER ERROR* which is defined by default. /// as opposite to *INNTERNAL SERVER ERROR* which is defined by default.
/// ///
/// ```rust /// ```rust
/// # extern crate actix_web; /// # extern crate actix_web;
@@ -523,59 +511,133 @@ macro_rules! ERROR_WRAP {
/// } /// }
/// # fn main() {} /// # fn main() {}
/// ``` /// ```
#[derive(Debug)] pub struct InternalError<T> {
pub struct ErrorBadRequest<T>(pub T); cause: T,
ERROR_WRAP!(ErrorBadRequest<T>, StatusCode::BAD_REQUEST); status: StatusCode,
backtrace: Backtrace,
}
#[derive(Debug)] unsafe impl<T> Sync for InternalError<T> {}
/// Helper type that can wrap any error and generate *UNAUTHORIZED* response. unsafe impl<T> Send for InternalError<T> {}
pub struct ErrorUnauthorized<T>(pub T);
ERROR_WRAP!(ErrorUnauthorized<T>, StatusCode::UNAUTHORIZED);
#[derive(Debug)] impl<T> InternalError<T> {
/// Helper type that can wrap any error and generate *FORBIDDEN* response. pub fn new(err: T, status: StatusCode) -> Self {
pub struct ErrorForbidden<T>(pub T); InternalError {
ERROR_WRAP!(ErrorForbidden<T>, StatusCode::FORBIDDEN); cause: err,
status: status,
backtrace: Backtrace::new(),
}
}
}
#[derive(Debug)] impl<T> Fail for InternalError<T>
/// Helper type that can wrap any error and generate *NOT FOUND* response. where T: Send + Sync + fmt::Debug + 'static
pub struct ErrorNotFound<T>(pub T); {
ERROR_WRAP!(ErrorNotFound<T>, StatusCode::NOT_FOUND); fn backtrace(&self) -> Option<&Backtrace> {
Some(&self.backtrace)
}
}
#[derive(Debug)] impl<T> fmt::Debug for InternalError<T>
/// Helper type that can wrap any error and generate *METHOD NOT ALLOWED* response. where T: Send + Sync + fmt::Debug + 'static
pub struct ErrorMethodNotAllowed<T>(pub T); {
ERROR_WRAP!(ErrorMethodNotAllowed<T>, StatusCode::METHOD_NOT_ALLOWED); fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.cause, f)
}
}
#[derive(Debug)] impl<T> fmt::Display for InternalError<T>
/// Helper type that can wrap any error and generate *REQUEST TIMEOUT* response. where T: Send + Sync + fmt::Debug + 'static
pub struct ErrorRequestTimeout<T>(pub T); {
ERROR_WRAP!(ErrorRequestTimeout<T>, StatusCode::REQUEST_TIMEOUT); fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.cause, f)
}
}
#[derive(Debug)] impl<T> ResponseError for InternalError<T>
/// Helper type that can wrap any error and generate *CONFLICT* response. where T: Send + Sync + fmt::Debug + 'static
pub struct ErrorConflict<T>(pub T); {
ERROR_WRAP!(ErrorConflict<T>, StatusCode::CONFLICT); fn error_response(&self) -> HttpResponse {
HttpResponse::new(self.status, Body::Empty)
}
}
#[derive(Debug)] impl<T> Responder for InternalError<T>
/// Helper type that can wrap any error and generate *GONE* response. where T: Send + Sync + fmt::Debug + 'static
pub struct ErrorGone<T>(pub T); {
ERROR_WRAP!(ErrorGone<T>, StatusCode::GONE); type Item = HttpResponse;
type Error = Error;
#[derive(Debug)] fn respond_to(self, _: HttpRequest) -> Result<HttpResponse, Error> {
/// Helper type that can wrap any error and generate *PRECONDITION FAILED* response. Err(self.into())
pub struct ErrorPreconditionFailed<T>(pub T); }
ERROR_WRAP!(ErrorPreconditionFailed<T>, StatusCode::PRECONDITION_FAILED); }
#[derive(Debug)] /// Helper function that creates wrapper of any error and generate *BAD REQUEST* response.
/// Helper type that can wrap any error and generate *EXPECTATION FAILED* response. #[allow(non_snake_case)]
pub struct ErrorExpectationFailed<T>(pub T); pub fn ErrorBadRequest<T>(err: T) -> InternalError<T> {
ERROR_WRAP!(ErrorExpectationFailed<T>, StatusCode::EXPECTATION_FAILED); InternalError::new(err, StatusCode::BAD_REQUEST)
}
#[derive(Debug)] /// Helper function that creates wrapper of any error and generate *UNAUTHORIZED* response.
/// Helper type that can wrap any error and generate *INTERNAL SERVER ERROR* response. #[allow(non_snake_case)]
pub struct ErrorInternalServerError<T>(pub T); pub fn ErrorUnauthorized<T>(err: T) -> InternalError<T> {
ERROR_WRAP!(ErrorInternalServerError<T>, StatusCode::INTERNAL_SERVER_ERROR); InternalError::new(err, StatusCode::UNAUTHORIZED)
}
/// Helper function that creates wrapper of any error and generate *FORBIDDEN* response.
#[allow(non_snake_case)]
pub fn ErrorForbidden<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::FORBIDDEN)
}
/// Helper function that creates wrapper of any error and generate *NOT FOUND* response.
#[allow(non_snake_case)]
pub fn ErrorNotFound<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::NOT_FOUND)
}
/// Helper function that creates wrapper of any error and generate *METHOD NOT ALLOWED* response.
#[allow(non_snake_case)]
pub fn ErrorMethodNotAllowed<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::METHOD_NOT_ALLOWED)
}
/// Helper function that creates wrapper of any error and generate *REQUEST TIMEOUT* response.
#[allow(non_snake_case)]
pub fn ErrorRequestTimeout<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::REQUEST_TIMEOUT)
}
/// Helper function that creates wrapper of any error and generate *CONFLICT* response.
#[allow(non_snake_case)]
pub fn ErrorConflict<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::CONFLICT)
}
/// Helper function that creates wrapper of any error and generate *GONE* response.
#[allow(non_snake_case)]
pub fn ErrorGone<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::GONE)
}
/// Helper function that creates wrapper of any error and generate *PRECONDITION FAILED* response.
#[allow(non_snake_case)]
pub fn ErrorPreconditionFailed<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::PRECONDITION_FAILED)
}
/// Helper function that creates wrapper of any error and generate *EXPECTATION FAILED* response.
#[allow(non_snake_case)]
pub fn ErrorExpectationFailed<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::EXPECTATION_FAILED)
}
/// Helper function that creates wrapper of any error and generate *INTERNAL SERVER ERROR* response.
#[allow(non_snake_case)]
pub fn ErrorInternalServerError<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::INTERNAL_SERVER_ERROR)
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {

View File

@@ -9,8 +9,10 @@ use std::path::{Path, PathBuf};
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use mime_guess::get_mime_type; use mime_guess::get_mime_type;
use param::FromParam; use param::FromParam;
use handler::{Handler, Responder}; use handler::{Handler, Responder};
use headers::ContentEncoding;
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use httpcodes::HTTPOk; use httpcodes::HTTPOk;
@@ -83,7 +85,6 @@ impl Responder for NamedFile {
fn respond_to(mut self, _: HttpRequest) -> Result<HttpResponse, io::Error> { fn respond_to(mut self, _: HttpRequest) -> Result<HttpResponse, io::Error> {
let mut resp = HTTPOk.build(); let mut resp = HTTPOk.build();
use headers::ContentEncoding;
resp.content_encoding(ContentEncoding::Identity); resp.content_encoding(ContentEncoding::Identity);
if let Some(ext) = self.path().extension() { if let Some(ext) = self.path().extension() {
let mime = get_mime_type(&ext.to_string_lossy()); let mime = get_mime_type(&ext.to_string_lossy());
@@ -138,7 +139,7 @@ impl Responder for Directory {
for entry in self.path.read_dir()? { for entry in self.path.read_dir()? {
if self.can_list(&entry) { if self.can_list(&entry) {
let entry = entry.unwrap(); let entry = entry.unwrap();
let p = match entry.path().strip_prefix(&self.base) { let p = match entry.path().strip_prefix(&self.path) {
Ok(p) => base.join(p), Ok(p) => base.join(p),
Err(_) => continue Err(_) => continue
}; };

View File

@@ -9,7 +9,7 @@ use error::Error;
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
/// Trait defines object that could be regestered as route handler /// Trait defines object that could be registered as route handler
#[allow(unused_variables)] #[allow(unused_variables)]
pub trait Handler<S>: 'static { pub trait Handler<S>: 'static {
@@ -35,7 +35,7 @@ pub trait Responder {
} }
#[doc(hidden)] #[doc(hidden)]
/// Convinience trait that convert `Future` object into `Boxed` future /// Convenience trait that convert `Future` object into `Boxed` future
pub trait AsyncResponder<I, E>: Sized { pub trait AsyncResponder<I, E>: Sized {
fn responder(self) -> Box<Future<Item=I, Error=E>>; fn responder(self) -> Box<Future<Item=I, Error=E>>;
} }
@@ -193,7 +193,7 @@ impl<I, E> Responder for Box<Future<Item=I, Error=E>>
} }
} }
/// Trait defines object that could be regestered as resource route /// Trait defines object that could be registered as resource route
pub(crate) trait RouteHandler<S>: 'static { pub(crate) trait RouteHandler<S>: 'static {
fn handle(&mut self, req: HttpRequest<S>) -> Reply; fn handle(&mut self, req: HttpRequest<S>) -> Reply;
} }
@@ -341,7 +341,7 @@ impl Default for NormalizePath {
} }
impl NormalizePath { impl NormalizePath {
/// Create new `NoramlizePath` instance /// Create new `NormalizePath` instance
pub fn new(append: bool, merge: bool, redirect: StatusCode) -> NormalizePath { pub fn new(append: bool, merge: bool, redirect: StatusCode) -> NormalizePath {
NormalizePath { NormalizePath {
append: append, append: append,

View File

@@ -66,84 +66,6 @@ impl fmt::Write for CachedDate {
} }
} }
/// Internal use only! unsafe
#[derive(Debug)]
pub(crate) struct SharedBytesPool(RefCell<VecDeque<Rc<BytesMut>>>);
impl SharedBytesPool {
pub fn new() -> SharedBytesPool {
SharedBytesPool(RefCell::new(VecDeque::with_capacity(128)))
}
pub fn get_bytes(&self) -> Rc<BytesMut> {
if let Some(bytes) = self.0.borrow_mut().pop_front() {
bytes
} else {
Rc::new(BytesMut::new())
}
}
pub fn release_bytes(&self, mut bytes: Rc<BytesMut>) {
let v = &mut self.0.borrow_mut();
if v.len() < 128 {
Rc::get_mut(&mut bytes).unwrap().take();
v.push_front(bytes);
}
}
}
#[derive(Debug)]
pub(crate) struct SharedBytes(
Option<Rc<BytesMut>>, Option<Rc<SharedBytesPool>>);
impl Drop for SharedBytes {
fn drop(&mut self) {
if let Some(ref pool) = self.1 {
if let Some(bytes) = self.0.take() {
if Rc::strong_count(&bytes) == 1 {
pool.release_bytes(bytes);
}
}
}
}
}
impl SharedBytes {
pub fn empty() -> Self {
SharedBytes(None, None)
}
pub fn new(bytes: Rc<BytesMut>, pool: Rc<SharedBytesPool>) -> SharedBytes {
SharedBytes(Some(bytes), Some(pool))
}
#[inline(always)]
#[allow(mutable_transmutes)]
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))]
pub fn get_mut(&self) -> &mut BytesMut {
let r: &BytesMut = self.0.as_ref().unwrap().as_ref();
unsafe{mem::transmute(r)}
}
#[inline]
pub fn get_ref(&self) -> &BytesMut {
self.0.as_ref().unwrap()
}
}
impl Default for SharedBytes {
fn default() -> Self {
SharedBytes(Some(Rc::new(BytesMut::new())), None)
}
}
impl Clone for SharedBytes {
fn clone(&self) -> SharedBytes {
SharedBytes(self.0.clone(), self.1.clone())
}
}
/// Internal use only! unsafe /// Internal use only! unsafe
pub(crate) struct SharedMessagePool(RefCell<VecDeque<Rc<HttpMessage>>>); pub(crate) struct SharedMessagePool(RefCell<VecDeque<Rc<HttpMessage>>>);

View File

@@ -222,7 +222,7 @@ impl<S> HttpRequest<S> {
self.uri().path() self.uri().path()
} }
/// Get *ConnectionInfo* for currect request. /// Get *ConnectionInfo* for correct request.
pub fn connection_info(&self) -> &ConnectionInfo { pub fn connection_info(&self) -> &ConnectionInfo {
if self.as_ref().info.is_none() { if self.as_ref().info.is_none() {
let info: ConnectionInfo<'static> = unsafe{ let info: ConnectionInfo<'static> = unsafe{
@@ -278,7 +278,7 @@ impl<S> HttpRequest<S> {
/// Peer socket address /// Peer socket address
/// ///
/// Peer address is actuall socket address, if proxy is used in front of /// Peer address is actual socket address, if proxy is used in front of
/// actix http server, then peer address would be address of this proxy. /// actix http server, then peer address would be address of this proxy.
/// ///
/// To get client connection information `connection_info()` method should be used. /// To get client connection information `connection_info()` method should be used.

View File

@@ -158,14 +158,14 @@ impl HttpResponse {
/// is chunked encoding enabled /// is chunked encoding enabled
#[inline] #[inline]
pub fn chunked(&self) -> bool { pub fn chunked(&self) -> Option<bool> {
self.get_ref().chunked self.get_ref().chunked
} }
/// Content encoding /// Content encoding
#[inline] #[inline]
pub fn content_encoding(&self) -> &ContentEncoding { pub fn content_encoding(&self) -> ContentEncoding {
&self.get_ref().encoding self.get_ref().encoding
} }
/// Set content encoding /// Set content encoding
@@ -329,7 +329,16 @@ impl HttpResponseBuilder {
#[inline] #[inline]
pub fn chunked(&mut self) -> &mut Self { pub fn chunked(&mut self) -> &mut Self {
if let Some(parts) = parts(&mut self.response, &self.err) { if let Some(parts) = parts(&mut self.response, &self.err) {
parts.chunked = true; parts.chunked = Some(true);
}
self
}
/// Force disable chunked encoding
#[inline]
pub fn no_chunking(&mut self) -> &mut Self {
if let Some(parts) = parts(&mut self.response, &self.err) {
parts.chunked = Some(false);
} }
self self
} }
@@ -414,8 +423,8 @@ impl HttpResponseBuilder {
} }
/// This method calls provided closure with builder reference if value is Some. /// This method calls provided closure with builder reference if value is Some.
pub fn if_some<T, F>(&mut self, value: Option<&T>, f: F) -> &mut Self pub fn if_some<T, F>(&mut self, value: Option<T>, f: F) -> &mut Self
where F: FnOnce(&T, &mut HttpResponseBuilder) where F: FnOnce(T, &mut HttpResponseBuilder)
{ {
if let Some(val) = value { if let Some(val) = value {
f(val, self); f(val, self);
@@ -641,7 +650,7 @@ struct InnerHttpResponse {
status: StatusCode, status: StatusCode,
reason: Option<&'static str>, reason: Option<&'static str>,
body: Body, body: Body,
chunked: bool, chunked: Option<bool>,
encoding: ContentEncoding, encoding: ContentEncoding,
connection_type: Option<ConnectionType>, connection_type: Option<ConnectionType>,
response_size: u64, response_size: u64,
@@ -658,7 +667,7 @@ impl InnerHttpResponse {
status: status, status: status,
reason: None, reason: None,
body: body, body: body,
chunked: false, chunked: None,
encoding: ContentEncoding::Auto, encoding: ContentEncoding::Auto,
connection_type: None, connection_type: None,
response_size: 0, response_size: 0,
@@ -709,7 +718,7 @@ impl Pool {
if v.len() < 128 { if v.len() < 128 {
inner.headers.clear(); inner.headers.clear();
inner.version = None; inner.version = None;
inner.chunked = false; inner.chunked = None;
inner.reason = None; inner.reason = None;
inner.encoding = ContentEncoding::Auto; inner.encoding = ContentEncoding::Auto;
inner.connection_type = None; inner.connection_type = None;
@@ -803,11 +812,11 @@ mod tests {
#[test] #[test]
fn test_content_encoding() { fn test_content_encoding() {
let resp = HttpResponse::build(StatusCode::OK).finish().unwrap(); let resp = HttpResponse::build(StatusCode::OK).finish().unwrap();
assert_eq!(*resp.content_encoding(), ContentEncoding::Auto); assert_eq!(resp.content_encoding(), ContentEncoding::Auto);
let resp = HttpResponse::build(StatusCode::OK) let resp = HttpResponse::build(StatusCode::OK)
.content_encoding(ContentEncoding::Br).finish().unwrap(); .content_encoding(ContentEncoding::Br).finish().unwrap();
assert_eq!(*resp.content_encoding(), ContentEncoding::Br); assert_eq!(resp.content_encoding(), ContentEncoding::Br);
} }
#[test] #[test]

View File

@@ -100,7 +100,7 @@ pub enum CorsBuilderError {
ParseError(http::Error), ParseError(http::Error),
/// Credentials are allowed, but the Origin is set to "*". This is not allowed by W3C /// Credentials are allowed, but the Origin is set to "*". This is not allowed by W3C
/// ///
/// This is a misconfiguration. Check the docuemntation for `Cors`. /// This is a misconfiguration. Check the documentation for `Cors`.
#[fail(display="Credentials are allowed, but the Origin is set to \"*\"")] #[fail(display="Credentials are allowed, but the Origin is set to \"*\"")]
CredentialsWithWildcardOrigin, CredentialsWithWildcardOrigin,
} }
@@ -214,7 +214,7 @@ impl Cors {
/// This method register cors middleware with resource and /// This method register cors middleware with resource and
/// adds route for *OPTIONS* preflight requests. /// adds route for *OPTIONS* preflight requests.
/// ///
/// It is possible to register *Cors* middlware with `Resource::middleware()` /// It is possible to register *Cors* middleware with `Resource::middleware()`
/// method, but in that case *Cors* middleware wont be able to handle *OPTIONS* /// method, but in that case *Cors* middleware wont be able to handle *OPTIONS*
/// requests. /// requests.
pub fn register<S: 'static>(self, resource: &mut Resource<S>) { pub fn register<S: 'static>(self, resource: &mut Resource<S>) {
@@ -295,16 +295,23 @@ impl<S> Middleware<S> for Cors {
self.validate_allowed_method(req)?; self.validate_allowed_method(req)?;
self.validate_allowed_headers(req)?; self.validate_allowed_headers(req)?;
// allowed headers
let headers = if let Some(headers) = self.headers.as_ref() {
Some(HeaderValue::try_from(&headers.iter().fold(
String::new(), |s, v| s + "," + v.as_str()).as_str()[1..]).unwrap())
} else if let Some(hdr) = req.headers().get(header::ACCESS_CONTROL_REQUEST_HEADERS) {
Some(hdr.clone())
} else {
None
};
Ok(Started::Response( Ok(Started::Response(
HTTPOk.build() HTTPOk.build()
.if_some(self.max_age.as_ref(), |max_age, resp| { .if_some(self.max_age.as_ref(), |max_age, resp| {
let _ = resp.header( let _ = resp.header(
header::ACCESS_CONTROL_MAX_AGE, format!("{}", max_age).as_str());}) header::ACCESS_CONTROL_MAX_AGE, format!("{}", max_age).as_str());})
.if_some(self.headers.as_ref(), |headers, resp| { .if_some(headers, |headers, resp| {
let _ = resp.header( let _ = resp.header(header::ACCESS_CONTROL_ALLOW_HEADERS, headers); })
header::ACCESS_CONTROL_ALLOW_HEADERS,
&headers.iter().fold(
String::new(), |s, v| s + "," + v.as_str()).as_str()[1..]);})
.if_true(self.origins.is_all(), |resp| { .if_true(self.origins.is_all(), |resp| {
if self.send_wildcard { if self.send_wildcard {
resp.header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*"); resp.header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*");
@@ -536,7 +543,7 @@ impl CorsBuilder {
} }
/// Set a list of headers which are safe to expose to the API of a CORS API specification. /// Set a list of headers which are safe to expose to the API of a CORS API specification.
/// This corresponds to the `Access-Control-Expose-Headers` responde header. /// This corresponds to the `Access-Control-Expose-Headers` response header.
/// ///
/// This is the `list of exposed headers` in the /// This is the `list of exposed headers` in the
/// [Resource Processing Model](https://www.w3.org/TR/cors/#resource-processing-model). /// [Resource Processing Model](https://www.w3.org/TR/cors/#resource-processing-model).
@@ -584,7 +591,6 @@ impl CorsBuilder {
/// in an `Error::CredentialsWithWildcardOrigin` error during actix launch or runtime. /// in an `Error::CredentialsWithWildcardOrigin` error during actix launch or runtime.
/// ///
/// Defaults to `false`. /// Defaults to `false`.
#[cfg_attr(feature = "serialization", serde(default))]
pub fn send_wildcard(&mut self) -> &mut CorsBuilder { pub fn send_wildcard(&mut self) -> &mut CorsBuilder {
if let Some(cors) = cors(&mut self.cors, &self.error) { if let Some(cors) = cors(&mut self.cors, &self.error) {
cors.send_wildcard = true cors.send_wildcard = true

View File

@@ -217,7 +217,7 @@ pub struct CookieSession {
inner: Rc<CookieSessionInner>, inner: Rc<CookieSessionInner>,
} }
/// Errors that can occure during handling cookie session /// Errors that can occur during handling cookie session
#[derive(Fail, Debug)] #[derive(Fail, Debug)]
pub enum CookieSessionError { pub enum CookieSessionError {
/// Size of the serialized session is greater than 4000 bytes. /// Size of the serialized session is greater than 4000 bytes.

View File

@@ -6,7 +6,7 @@ use std::slice::Iter;
use std::borrow::Cow; use std::borrow::Cow;
use smallvec::SmallVec; use smallvec::SmallVec;
use error::{ResponseError, UriSegmentError, ErrorBadRequest}; use error::{ResponseError, UriSegmentError, InternalError, ErrorBadRequest};
/// A trait to abstract the idea of creating a new instance of a type from a path parameter. /// A trait to abstract the idea of creating a new instance of a type from a path parameter.
@@ -77,7 +77,7 @@ impl<'a> Params<'a> {
} }
} }
/// Return iterator to items in paramter container /// Return iterator to items in parameter container
pub fn iter(&self) -> Iter<(Cow<'a, str>, Cow<'a, str>)> { pub fn iter(&self) -> Iter<(Cow<'a, str>, Cow<'a, str>)> {
self.0.iter() self.0.iter()
} }
@@ -141,7 +141,7 @@ impl FromParam for PathBuf {
macro_rules! FROM_STR { macro_rules! FROM_STR {
($type:ty) => { ($type:ty) => {
impl FromParam for $type { impl FromParam for $type {
type Err = ErrorBadRequest<<$type as FromStr>::Err>; type Err = InternalError<<$type as FromStr>::Err>;
fn from_param(val: &str) -> Result<Self, Self::Err> { fn from_param(val: &str) -> Result<Self, Self::Err> {
<$type as FromStr>::from_str(val).map_err(ErrorBadRequest) <$type as FromStr>::from_str(val).map_err(ErrorBadRequest)

View File

@@ -3,6 +3,7 @@ use std::rc::Rc;
use std::cell::RefCell; use std::cell::RefCell;
use std::marker::PhantomData; use std::marker::PhantomData;
use log::Level::Debug;
use futures::{Async, Poll, Future, Stream}; use futures::{Async, Poll, Future, Stream};
use futures::unsync::oneshot; use futures::unsync::oneshot;
@@ -56,7 +57,7 @@ impl<S: 'static, H: PipelineHandler<S>> PipelineState<S, H> {
struct PipelineInfo<S> { struct PipelineInfo<S> {
req: HttpRequest<S>, req: HttpRequest<S>,
count: usize, count: u16,
mws: Rc<Vec<Box<Middleware<S>>>>, mws: Rc<Vec<Box<Middleware<S>>>>,
context: Option<Box<ActorHttpContext>>, context: Option<Box<ActorHttpContext>>,
error: Option<Error>, error: Option<Error>,
@@ -210,14 +211,14 @@ impl<S: 'static, H: PipelineHandler<S>> StartMiddlewares<S, H> {
fn init(info: &mut PipelineInfo<S>, handler: Rc<RefCell<H>>) -> PipelineState<S, H> { fn init(info: &mut PipelineInfo<S>, handler: Rc<RefCell<H>>) -> PipelineState<S, H> {
// execute middlewares, we need this stage because middlewares could be non-async // execute middlewares, we need this stage because middlewares could be non-async
// and we can move to next state immidietly // and we can move to next state immediately
let len = info.mws.len(); let len = info.mws.len() as u16;
loop { loop {
if info.count == len { if info.count == len {
let reply = handler.borrow_mut().handle(info.req.clone()); let reply = handler.borrow_mut().handle(info.req.clone());
return WaitingResponse::init(info, reply) return WaitingResponse::init(info, reply)
} else { } else {
match info.mws[info.count].start(&mut info.req) { match info.mws[info.count as usize].start(&mut info.req) {
Ok(Started::Done) => Ok(Started::Done) =>
info.count += 1, info.count += 1,
Ok(Started::Response(resp)) => Ok(Started::Response(resp)) =>
@@ -246,7 +247,7 @@ impl<S: 'static, H: PipelineHandler<S>> StartMiddlewares<S, H> {
} }
fn poll(&mut self, info: &mut PipelineInfo<S>) -> Option<PipelineState<S, H>> { fn poll(&mut self, info: &mut PipelineInfo<S>) -> Option<PipelineState<S, H>> {
let len = info.mws.len(); let len = info.mws.len() as u16;
'outer: loop { 'outer: loop {
match self.fut.as_mut().unwrap().poll() { match self.fut.as_mut().unwrap().poll() {
Ok(Async::NotReady) => return None, Ok(Async::NotReady) => return None,
@@ -260,7 +261,7 @@ impl<S: 'static, H: PipelineHandler<S>> StartMiddlewares<S, H> {
return Some(WaitingResponse::init(info, reply)); return Some(WaitingResponse::init(info, reply));
} else { } else {
loop { loop {
match info.mws[info.count].start(info.req_mut()) { match info.mws[info.count as usize].start(info.req_mut()) {
Ok(Started::Done) => Ok(Started::Done) =>
info.count += 1, info.count += 1,
Ok(Started::Response(resp)) => { Ok(Started::Response(resp)) => {
@@ -334,7 +335,7 @@ impl<S: 'static, H> RunMiddlewares<S, H> {
loop { loop {
resp = match info.mws[curr].response(info.req_mut(), resp) { resp = match info.mws[curr].response(info.req_mut(), resp) {
Err(err) => { Err(err) => {
info.count = curr + 1; info.count = (curr + 1) as u16;
return ProcessResponse::init(err.into()) return ProcessResponse::init(err.into())
} }
Ok(Response::Done(r)) => { Ok(Response::Done(r)) => {
@@ -439,8 +440,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
ProcessResponse{ resp: resp, ProcessResponse{ resp: resp,
iostate: IOState::Response, iostate: IOState::Response,
running: RunningState::Running, running: RunningState::Running,
drain: None, drain: None, _s: PhantomData, _h: PhantomData})
_s: PhantomData, _h: PhantomData})
} }
fn poll_io(mut self, io: &mut Writer, info: &mut PipelineInfo<S>) fn poll_io(mut self, io: &mut Writer, info: &mut PipelineInfo<S>)
@@ -448,7 +448,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
{ {
if self.drain.is_none() && self.running != RunningState::Paused { if self.drain.is_none() && self.running != RunningState::Paused {
// if task is paused, write buffer is probably full // if task is paused, write buffer is probably full
loop { 'outter: loop {
let result = match mem::replace(&mut self.iostate, IOState::Done) { let result = match mem::replace(&mut self.iostate, IOState::Done) {
IOState::Response => { IOState::Response => {
let result = match io.start(info.req_mut().get_inner(), &mut self.resp) { let result = match io.start(info.req_mut().get_inner(), &mut self.resp) {
@@ -459,6 +459,13 @@ impl<S: 'static, H> ProcessResponse<S, H> {
} }
}; };
if let Some(err) = self.resp.error() {
warn!("Error occured during request handling: {}", err);
if log_enabled!(Debug) {
debug!("{:?}", err);
}
}
match self.resp.replace_body(Body::Empty) { match self.resp.replace_body(Body::Empty) {
Body::Streaming(stream) => Body::Streaming(stream) =>
self.iostate = IOState::Payload(stream), self.iostate = IOState::Payload(stream),
@@ -481,7 +488,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
}, },
Ok(Async::Ready(Some(chunk))) => { Ok(Async::Ready(Some(chunk))) => {
self.iostate = IOState::Payload(body); self.iostate = IOState::Payload(body);
match io.write(chunk.as_ref()) { match io.write(chunk.into()) {
Err(err) => { Err(err) => {
info.error = Some(err.into()); info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp)) return Ok(FinishingMiddlewares::init(info, self.resp))
@@ -504,7 +511,13 @@ impl<S: 'static, H> ProcessResponse<S, H> {
ctx.disconnected(); ctx.disconnected();
} }
match ctx.poll() { match ctx.poll() {
Ok(Async::Ready(Some(frame))) => { Ok(Async::Ready(Some(vec))) => {
if vec.is_empty() {
self.iostate = IOState::Actor(ctx);
break
}
let mut res = None;
for frame in vec {
match frame { match frame {
Frame::Chunk(None) => { Frame::Chunk(None) => {
info.context = Some(ctx); info.context = Some(ctx);
@@ -514,25 +527,28 @@ impl<S: 'static, H> ProcessResponse<S, H> {
return Ok( return Ok(
FinishingMiddlewares::init(info, self.resp)) FinishingMiddlewares::init(info, self.resp))
} }
break break 'outter
}, },
Frame::Chunk(Some(chunk)) => { Frame::Chunk(Some(chunk)) => {
self.iostate = IOState::Actor(ctx); match io.write(chunk) {
match io.write(chunk.as_ref()) {
Err(err) => { Err(err) => {
info.error = Some(err.into()); info.error = Some(err.into());
return Ok( return Ok(
FinishingMiddlewares::init(info, self.resp)) FinishingMiddlewares::init(info, self.resp))
}, },
Ok(result) => result Ok(result) => res = Some(result),
} }
}, },
Frame::Drain(fut) => { Frame::Drain(fut) =>
self.drain = Some(fut); self.drain = Some(fut),
}
}
self.iostate = IOState::Actor(ctx); self.iostate = IOState::Actor(ctx);
break if self.drain.is_some() {
} self.running.resume();
break 'outter
} }
res.unwrap()
}, },
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
self.iostate = IOState::Done; self.iostate = IOState::Done;
@@ -567,16 +583,6 @@ impl<S: 'static, H> ProcessResponse<S, H> {
if self.running == RunningState::Paused || self.drain.is_some() { if self.running == RunningState::Paused || self.drain.is_some() {
match io.poll_completed(false) { match io.poll_completed(false) {
Ok(Async::Ready(_)) => { Ok(Async::Ready(_)) => {
match io.flush() {
Ok(Async::Ready(_)) => (),
Ok(Async::NotReady) => return Err(PipelineState::Response(self)),
Err(err) => {
debug!("Error sending data: {}", err);
info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp))
}
}
self.running.resume(); self.running.resume();
// resolve drain futures // resolve drain futures
@@ -588,7 +594,6 @@ impl<S: 'static, H> ProcessResponse<S, H> {
}, },
Ok(Async::NotReady) => return Err(PipelineState::Response(self)), Ok(Async::NotReady) => return Err(PipelineState::Response(self)),
Err(err) => { Err(err) => {
debug!("Error sending data: {}", err);
info.error = Some(err.into()); info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp)) return Ok(FinishingMiddlewares::init(info, self.resp))
} }
@@ -601,7 +606,6 @@ impl<S: 'static, H> ProcessResponse<S, H> {
match io.write_eof() { match io.write_eof() {
Ok(_) => (), Ok(_) => (),
Err(err) => { Err(err) => {
debug!("Error sending data: {}", err);
info.error = Some(err.into()); info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp)) return Ok(FinishingMiddlewares::init(info, self.resp))
} }
@@ -663,7 +667,7 @@ impl<S: 'static, H> FinishingMiddlewares<S, H> {
self.fut = None; self.fut = None;
info.count -= 1; info.count -= 1;
match info.mws[info.count].finish(info.req_mut(), &self.resp) { match info.mws[info.count as usize].finish(info.req_mut(), &self.resp) {
Finished::Done => { Finished::Done => {
if info.count == 0 { if info.count == 0 {
return Some(Completed::init(info)) return Some(Completed::init(info))
@@ -677,12 +681,17 @@ impl<S: 'static, H> FinishingMiddlewares<S, H> {
} }
} }
#[derive(Debug)]
struct Completed<S, H>(PhantomData<S>, PhantomData<H>); struct Completed<S, H>(PhantomData<S>, PhantomData<H>);
impl<S, H> Completed<S, H> { impl<S, H> Completed<S, H> {
#[inline] #[inline]
fn init(info: &mut PipelineInfo<S>) -> PipelineState<S, H> { fn init(info: &mut PipelineInfo<S>) -> PipelineState<S, H> {
if let Some(ref err) = info.error {
error!("Error occured during request handling: {}", err);
}
if info.context.is_none() { if info.context.is_none() {
PipelineState::None PipelineState::None
} else { } else {

View File

@@ -19,7 +19,7 @@ use httpresponse::HttpResponse;
/// Route uses builder-like pattern for configuration. /// Route uses builder-like pattern for configuration.
/// During request handling, resource object iterate through all routes /// During request handling, resource object iterate through all routes
/// and check all predicates for specific route, if request matches all predicates route /// and check all predicates for specific route, if request matches all predicates route
/// route considired matched and route handler get called. /// route considered matched and route handler get called.
/// ///
/// ```rust /// ```rust
/// # extern crate actix_web; /// # extern crate actix_web;

View File

@@ -3,7 +3,7 @@ use std::io::{Read, Write};
use std::fmt::Write as FmtWrite; use std::fmt::Write as FmtWrite;
use std::str::FromStr; use std::str::FromStr;
use http::Version; use http::{Version, Method, HttpTryFrom};
use http::header::{HeaderMap, HeaderValue, use http::header::{HeaderMap, HeaderValue,
ACCEPT_ENCODING, CONNECTION, ACCEPT_ENCODING, CONNECTION,
CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING}; CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING};
@@ -16,11 +16,13 @@ use bytes::{Bytes, BytesMut, BufMut, Writer};
use headers::ContentEncoding; use headers::ContentEncoding;
use body::{Body, Binary}; use body::{Body, Binary};
use error::PayloadError; use error::PayloadError;
use helpers::SharedBytes;
use httprequest::HttpMessage; use httprequest::HttpMessage;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use payload::{PayloadSender, PayloadWriter}; use payload::{PayloadSender, PayloadWriter};
use super::shared::SharedBytes;
impl ContentEncoding { impl ContentEncoding {
#[inline] #[inline]
@@ -344,15 +346,17 @@ impl PayloadEncoder {
pub fn new(buf: SharedBytes, req: &HttpMessage, resp: &mut HttpResponse) -> PayloadEncoder { pub fn new(buf: SharedBytes, req: &HttpMessage, resp: &mut HttpResponse) -> PayloadEncoder {
let version = resp.version().unwrap_or_else(|| req.version); let version = resp.version().unwrap_or_else(|| req.version);
let mut body = resp.replace_body(Body::Empty); let mut body = resp.replace_body(Body::Empty);
let response_encoding = resp.content_encoding();
let has_body = match body { let has_body = match body {
Body::Empty => false, Body::Empty => false,
Body::Binary(ref bin) => bin.len() >= 512, Body::Binary(ref bin) =>
!(response_encoding == ContentEncoding::Auto && bin.len() < 96),
_ => true, _ => true,
}; };
// Enable content encoding only if response does not contain Content-Encoding header // Enable content encoding only if response does not contain Content-Encoding header
let mut encoding = if has_body { let mut encoding = if has_body {
let encoding = match *resp.content_encoding() { let encoding = match response_encoding {
ContentEncoding::Auto => { ContentEncoding::Auto => {
// negotiate content-encoding // negotiate content-encoding
if let Some(val) = req.headers.get(ACCEPT_ENCODING) { if let Some(val) = req.headers.get(ACCEPT_ENCODING) {
@@ -376,13 +380,12 @@ impl PayloadEncoder {
ContentEncoding::Identity ContentEncoding::Identity
}; };
let transfer = match body { let mut transfer = match body {
Body::Empty => { Body::Empty => {
if resp.chunked() { if req.method != Method::HEAD {
error!("Chunked transfer is enabled but body is set to Empty");
}
resp.headers_mut().remove(CONTENT_LENGTH); resp.headers_mut().remove(CONTENT_LENGTH);
TransferEncoding::eof(buf) }
TransferEncoding::length(0, buf)
}, },
Body::Binary(ref mut bytes) => { Body::Binary(ref mut bytes) => {
if encoding.is_compression() { if encoding.is_compression() {
@@ -399,13 +402,20 @@ impl PayloadEncoder {
ContentEncoding::Auto => unreachable!() ContentEncoding::Auto => unreachable!()
}; };
// TODO return error! // TODO return error!
let _ = enc.write(bytes.as_ref()); let _ = enc.write(bytes.clone());
let _ = enc.write_eof(); let _ = enc.write_eof();
*bytes = Binary::from(tmp.get_mut().take()); *bytes = Binary::from(tmp.take());
encoding = ContentEncoding::Identity; encoding = ContentEncoding::Identity;
} }
if req.method == Method::HEAD {
let mut b = BytesMut::new();
let _ = write!(b, "{}", bytes.len());
resp.headers_mut().insert(
CONTENT_LENGTH, HeaderValue::try_from(b.freeze()).unwrap());
} else {
resp.headers_mut().remove(CONTENT_LENGTH); resp.headers_mut().remove(CONTENT_LENGTH);
}
TransferEncoding::eof(buf) TransferEncoding::eof(buf)
} }
Body::Streaming(_) | Body::Actor(_) => { Body::Streaming(_) | Body::Actor(_) => {
@@ -426,7 +436,12 @@ impl PayloadEncoder {
} }
} }
}; };
//
if req.method == Method::HEAD {
transfer.kind = TransferEncodingKind::Length(0);
} else {
resp.replace_body(body); resp.replace_body(body);
}
PayloadEncoder( PayloadEncoder(
match encoding { match encoding {
@@ -444,7 +459,8 @@ impl PayloadEncoder {
fn streaming_encoding(buf: SharedBytes, version: Version, fn streaming_encoding(buf: SharedBytes, version: Version,
resp: &mut HttpResponse) -> TransferEncoding { resp: &mut HttpResponse) -> TransferEncoding {
if resp.chunked() { match resp.chunked() {
Some(true) => {
// Enable transfer encoding // Enable transfer encoding
resp.headers_mut().remove(CONTENT_LENGTH); resp.headers_mut().remove(CONTENT_LENGTH);
if version == Version::HTTP_2 { if version == Version::HTTP_2 {
@@ -455,7 +471,10 @@ impl PayloadEncoder {
TRANSFER_ENCODING, HeaderValue::from_static("chunked")); TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
TransferEncoding::chunked(buf) TransferEncoding::chunked(buf)
} }
} else { },
Some(false) =>
TransferEncoding::eof(buf),
None => {
// if Content-Length is specified, then use it as length hint // if Content-Length is specified, then use it as length hint
let (len, chunked) = let (len, chunked) =
if let Some(len) = resp.headers().get(CONTENT_LENGTH) { if let Some(len) = resp.headers().get(CONTENT_LENGTH) {
@@ -483,14 +502,17 @@ impl PayloadEncoder {
} }
} else { } else {
// Enable transfer encoding // Enable transfer encoding
resp.headers_mut().remove(CONTENT_LENGTH); match version {
if version == Version::HTTP_2 { Version::HTTP_11 => {
resp.headers_mut().remove(TRANSFER_ENCODING);
TransferEncoding::eof(buf)
} else {
resp.headers_mut().insert( resp.headers_mut().insert(
TRANSFER_ENCODING, HeaderValue::from_static("chunked")); TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
TransferEncoding::chunked(buf) TransferEncoding::chunked(buf)
},
_ => {
resp.headers_mut().remove(TRANSFER_ENCODING);
TransferEncoding::eof(buf)
}
}
} }
} }
} }
@@ -499,16 +521,6 @@ impl PayloadEncoder {
impl PayloadEncoder { impl PayloadEncoder {
#[inline]
pub fn len(&self) -> usize {
self.0.get_ref().len()
}
#[inline]
pub fn get_mut(&mut self) -> &mut BytesMut {
self.0.get_mut()
}
#[inline] #[inline]
pub fn is_eof(&self) -> bool { pub fn is_eof(&self) -> bool {
self.0.is_eof() self.0.is_eof()
@@ -516,7 +528,7 @@ impl PayloadEncoder {
#[cfg_attr(feature = "cargo-clippy", allow(inline_always))] #[cfg_attr(feature = "cargo-clippy", allow(inline_always))]
#[inline(always)] #[inline(always)]
pub fn write(&mut self, payload: &[u8]) -> Result<(), io::Error> { pub fn write(&mut self, payload: Binary) -> Result<(), io::Error> {
self.0.write(payload) self.0.write(payload)
} }
@@ -539,42 +551,10 @@ impl ContentEncoder {
#[inline] #[inline]
pub fn is_eof(&self) -> bool { pub fn is_eof(&self) -> bool {
match *self { match *self {
ContentEncoder::Br(ref encoder) => ContentEncoder::Br(ref encoder) => encoder.get_ref().is_eof(),
encoder.get_ref().is_eof(), ContentEncoder::Deflate(ref encoder) => encoder.get_ref().is_eof(),
ContentEncoder::Deflate(ref encoder) => ContentEncoder::Gzip(ref encoder) => encoder.get_ref().is_eof(),
encoder.get_ref().is_eof(), ContentEncoder::Identity(ref encoder) => encoder.is_eof(),
ContentEncoder::Gzip(ref encoder) =>
encoder.get_ref().is_eof(),
ContentEncoder::Identity(ref encoder) =>
encoder.is_eof(),
}
}
#[inline]
pub fn get_ref(&self) -> &BytesMut {
match *self {
ContentEncoder::Br(ref encoder) =>
encoder.get_ref().buffer.get_ref(),
ContentEncoder::Deflate(ref encoder) =>
encoder.get_ref().buffer.get_ref(),
ContentEncoder::Gzip(ref encoder) =>
encoder.get_ref().buffer.get_ref(),
ContentEncoder::Identity(ref encoder) =>
encoder.buffer.get_ref(),
}
}
#[inline]
pub fn get_mut(&mut self) -> &mut BytesMut {
match *self {
ContentEncoder::Br(ref mut encoder) =>
encoder.get_mut().buffer.get_mut(),
ContentEncoder::Deflate(ref mut encoder) =>
encoder.get_mut().buffer.get_mut(),
ContentEncoder::Gzip(ref mut encoder) =>
encoder.get_mut().buffer.get_mut(),
ContentEncoder::Identity(ref mut encoder) =>
encoder.buffer.get_mut(),
} }
} }
@@ -625,10 +605,10 @@ impl ContentEncoder {
#[cfg_attr(feature = "cargo-clippy", allow(inline_always))] #[cfg_attr(feature = "cargo-clippy", allow(inline_always))]
#[inline(always)] #[inline(always)]
pub fn write(&mut self, data: &[u8]) -> Result<(), io::Error> { pub fn write(&mut self, data: Binary) -> Result<(), io::Error> {
match *self { match *self {
ContentEncoder::Br(ref mut encoder) => { ContentEncoder::Br(ref mut encoder) => {
match encoder.write(data) { match encoder.write(data.as_ref()) {
Ok(_) => Ok(_) =>
encoder.flush(), encoder.flush(),
Err(err) => { Err(err) => {
@@ -638,7 +618,7 @@ impl ContentEncoder {
} }
}, },
ContentEncoder::Gzip(ref mut encoder) => { ContentEncoder::Gzip(ref mut encoder) => {
match encoder.write(data) { match encoder.write(data.as_ref()) {
Ok(_) => Ok(_) =>
encoder.flush(), encoder.flush(),
Err(err) => { Err(err) => {
@@ -648,7 +628,7 @@ impl ContentEncoder {
} }
} }
ContentEncoder::Deflate(ref mut encoder) => { ContentEncoder::Deflate(ref mut encoder) => {
match encoder.write(data) { match encoder.write(data.as_ref()) {
Ok(_) => Ok(_) =>
encoder.flush(), encoder.flush(),
Err(err) => { Err(err) => {
@@ -682,7 +662,7 @@ enum TransferEncodingKind {
Length(u64), Length(u64),
/// An Encoder for when Content-Length is not known. /// An Encoder for when Content-Length is not known.
/// ///
/// Appliction decides when to stop writing. /// Application decides when to stop writing.
Eof, Eof,
} }
@@ -723,11 +703,12 @@ impl TransferEncoding {
/// Encode message. Return `EOF` state of encoder /// Encode message. Return `EOF` state of encoder
#[inline] #[inline]
pub fn encode(&mut self, msg: &[u8]) -> io::Result<bool> { pub fn encode(&mut self, mut msg: Binary) -> io::Result<bool> {
match self.kind { match self.kind {
TransferEncodingKind::Eof => { TransferEncodingKind::Eof => {
self.buffer.get_mut().extend_from_slice(msg); let eof = msg.is_empty();
Ok(msg.is_empty()) self.buffer.extend(msg);
Ok(eof)
}, },
TransferEncodingKind::Chunked(ref mut eof) => { TransferEncodingKind::Chunked(ref mut eof) => {
if *eof { if *eof {
@@ -736,24 +717,31 @@ impl TransferEncoding {
if msg.is_empty() { if msg.is_empty() {
*eof = true; *eof = true;
self.buffer.get_mut().extend_from_slice(b"0\r\n\r\n"); self.buffer.extend_from_slice(b"0\r\n\r\n");
} else { } else {
write!(self.buffer.get_mut(), "{:X}\r\n", msg.len()) let mut buf = BytesMut::new();
write!(&mut buf, "{:X}\r\n", msg.len())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
self.buffer.get_mut().extend_from_slice(msg); self.buffer.reserve(buf.len() + msg.len() + 2);
self.buffer.get_mut().extend_from_slice(b"\r\n"); self.buffer.extend(buf.into());
self.buffer.extend(msg);
self.buffer.extend_from_slice(b"\r\n");
} }
Ok(*eof) Ok(*eof)
}, },
TransferEncodingKind::Length(ref mut remaining) => { TransferEncodingKind::Length(ref mut remaining) => {
if *remaining > 0 {
if msg.is_empty() { if msg.is_empty() {
return Ok(*remaining == 0) return Ok(*remaining == 0)
} }
let max = cmp::min(*remaining, msg.len() as u64); let len = cmp::min(*remaining, msg.len() as u64);
self.buffer.get_mut().extend_from_slice(msg[..max as usize].as_ref()); self.buffer.extend(msg.take().split_to(len as usize).into());
*remaining -= max as u64; *remaining -= len as u64;
Ok(*remaining == 0) Ok(*remaining == 0)
} else {
Ok(true)
}
}, },
} }
} }
@@ -766,7 +754,7 @@ impl TransferEncoding {
TransferEncodingKind::Chunked(ref mut eof) => { TransferEncodingKind::Chunked(ref mut eof) => {
if !*eof { if !*eof {
*eof = true; *eof = true;
self.buffer.get_mut().extend_from_slice(b"0\r\n\r\n"); self.buffer.extend_from_slice(b"0\r\n\r\n");
} }
}, },
} }
@@ -777,7 +765,7 @@ impl io::Write for TransferEncoding {
#[inline] #[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.encode(buf)?; self.encode(Binary::from_slice(buf))?;
Ok(buf.len()) Ok(buf.len())
} }
@@ -863,8 +851,8 @@ mod tests {
fn test_chunked_te() { fn test_chunked_te() {
let bytes = SharedBytes::default(); let bytes = SharedBytes::default();
let mut enc = TransferEncoding::chunked(bytes.clone()); let mut enc = TransferEncoding::chunked(bytes.clone());
assert!(!enc.encode(b"test").ok().unwrap()); assert!(!enc.encode(Binary::from(b"test".as_ref())).ok().unwrap());
assert!(enc.encode(b"").ok().unwrap()); assert!(enc.encode(Binary::from(b"".as_ref())).ok().unwrap());
assert_eq!(bytes.get_mut().take().freeze(), assert_eq!(bytes.get_mut().take().freeze(),
Bytes::from_static(b"4\r\ntest\r\n0\r\n\r\n")); Bytes::from_static(b"4\r\ntest\r\n0\r\n\r\n"));
} }

View File

@@ -96,12 +96,12 @@ impl<T, H> Http1<T, H>
} }
} }
// TODO: refacrtor // TODO: refactor
#[cfg_attr(feature = "cargo-clippy", allow(cyclomatic_complexity))] #[cfg_attr(feature = "cargo-clippy", allow(cyclomatic_complexity))]
pub fn poll(&mut self) -> Poll<(), ()> { pub fn poll(&mut self) -> Poll<(), ()> {
// keep-alive timer // keep-alive timer
if self.keepalive_timer.is_some() { if let Some(ref mut timer) = self.keepalive_timer {
match self.keepalive_timer.as_mut().unwrap().poll() { match timer.poll() {
Ok(Async::Ready(_)) => { Ok(Async::Ready(_)) => {
trace!("Keep-alive timeout, close connection"); trace!("Keep-alive timeout, close connection");
return Ok(Async::Ready(())) return Ok(Async::Ready(()))
@@ -133,7 +133,7 @@ impl<T, H> Http1<T, H>
Ok(Async::Ready(ready)) => { Ok(Async::Ready(ready)) => {
not_ready = false; not_ready = false;
// overide keep-alive state // override keep-alive state
if self.stream.keepalive() { if self.stream.keepalive() {
self.flags.insert(Flags::KEEPALIVE); self.flags.insert(Flags::KEEPALIVE);
} else { } else {
@@ -146,10 +146,8 @@ impl<T, H> Http1<T, H>
item.flags.insert(EntryFlags::FINISHED); item.flags.insert(EntryFlags::FINISHED);
} }
}, },
Ok(Async::NotReady) => {
// no more IO for this iteration // no more IO for this iteration
io = true; Ok(Async::NotReady) => io = true,
},
Err(err) => { Err(err) => {
// it is not possible to recover from error // it is not possible to recover from error
// during pipe handling, so just drop connection // during pipe handling, so just drop connection
@@ -227,38 +225,7 @@ impl<T, H> Http1<T, H>
self.tasks.push_back( self.tasks.push_back(
Entry {pipe: pipe.unwrap_or_else(|| Pipeline::error(HTTPNotFound)), Entry {pipe: pipe.unwrap_or_else(|| Pipeline::error(HTTPNotFound)),
flags: EntryFlags::empty()}); flags: EntryFlags::empty()});
}
Err(ReaderError::Disconnect) => {
not_ready = false;
self.flags.insert(Flags::ERROR);
self.stream.disconnected();
for entry in &mut self.tasks {
entry.pipe.disconnected()
}
}, },
Err(err) => {
// notify all tasks
not_ready = false;
self.stream.disconnected();
for entry in &mut self.tasks {
entry.pipe.disconnected()
}
// kill keepalive
self.flags.remove(Flags::KEEPALIVE);
self.keepalive_timer.take();
// on parse error, stop reading stream but tasks need to be completed
self.flags.insert(Flags::ERROR);
if self.tasks.is_empty() {
if let ReaderError::Error(err) = err {
self.tasks.push_back(
Entry {pipe: Pipeline::error(err.error_response()),
flags: EntryFlags::empty()});
}
}
}
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
// start keep-alive timer, this also is slow request timeout // start keep-alive timer, this also is slow request timeout
if self.tasks.is_empty() { if self.tasks.is_empty() {
@@ -293,7 +260,38 @@ impl<T, H> Http1<T, H>
} }
} }
break break
},
Err(ReaderError::Disconnect) => {
not_ready = false;
self.flags.insert(Flags::ERROR);
self.stream.disconnected();
for entry in &mut self.tasks {
entry.pipe.disconnected()
} }
},
Err(err) => {
// notify all tasks
not_ready = false;
self.stream.disconnected();
for entry in &mut self.tasks {
entry.pipe.disconnected()
}
// kill keepalive
self.flags.remove(Flags::KEEPALIVE);
self.keepalive_timer.take();
// on parse error, stop reading stream but tasks need to be completed
self.flags.insert(Flags::ERROR);
if self.tasks.is_empty() {
if let ReaderError::Error(err) = err {
self.tasks.push_back(
Entry {pipe: Pipeline::error(err.error_response()),
flags: EntryFlags::empty()});
}
}
},
} }
} }
@@ -1204,6 +1202,7 @@ mod tests {
panic!("Error"); panic!("Error");
} }
// type in chunked
let mut buf = Buffer::new( let mut buf = Buffer::new(
"GET /test HTTP/1.1\r\n\ "GET /test HTTP/1.1\r\n\
transfer-encoding: chnked\r\n\r\n"); transfer-encoding: chnked\r\n\r\n");

View File

@@ -2,15 +2,15 @@ use std::io;
use bytes::BufMut; use bytes::BufMut;
use futures::{Async, Poll}; use futures::{Async, Poll};
use tokio_io::AsyncWrite; use tokio_io::AsyncWrite;
use http::Version; use http::{Method, Version};
use http::header::{HeaderValue, CONNECTION, DATE}; use http::header::{HeaderValue, CONNECTION, DATE};
use helpers; use helpers;
use body::Body; use body::{Body, Binary};
use helpers::SharedBytes;
use httprequest::HttpMessage; use httprequest::HttpMessage;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
use super::shared::SharedBytes;
use super::encoding::PayloadEncoder; use super::encoding::PayloadEncoder;
const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific
@@ -56,23 +56,25 @@ impl<T: AsyncWrite> H1Writer<T> {
} }
pub fn disconnected(&mut self) { pub fn disconnected(&mut self) {
self.encoder.get_mut().take(); self.buffer.take();
} }
pub fn keepalive(&self) -> bool { pub fn keepalive(&self) -> bool {
self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE) self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE)
} }
fn write_to_stream(&mut self) -> Result<WriterState, io::Error> { fn write_to_stream(&mut self) -> io::Result<WriterState> {
let buffer = self.encoder.get_mut(); while !self.buffer.is_empty() {
match self.stream.write(self.buffer.as_ref()) {
while !buffer.is_empty() { Ok(0) => {
match self.stream.write(buffer.as_ref()) { self.disconnected();
return Ok(WriterState::Done);
},
Ok(n) => { Ok(n) => {
let _ = buffer.split_to(n); let _ = self.buffer.split_to(n);
}, },
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if buffer.len() > MAX_WRITE_BUFFER_SIZE { if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
return Ok(WriterState::Pause) return Ok(WriterState::Pause)
} else { } else {
return Ok(WriterState::Done) return Ok(WriterState::Done)
@@ -92,23 +94,7 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
self.written self.written
} }
#[inline] fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse) -> io::Result<WriterState> {
fn flush(&mut self) -> Poll<(), io::Error> {
match self.stream.flush() {
Ok(_) => Ok(Async::Ready(())),
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
Ok(Async::NotReady)
} else {
Err(e)
}
}
}
}
fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse)
-> Result<WriterState, io::Error>
{
// prepare task // prepare task
self.flags.insert(Flags::STARTED); self.flags.insert(Flags::STARTED);
self.encoder = PayloadEncoder::new(self.buffer.clone(), req, msg); self.encoder = PayloadEncoder::new(self.buffer.clone(), req, msg);
@@ -133,7 +119,7 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
// render message // render message
{ {
let mut buffer = self.encoder.get_mut(); let mut buffer = self.buffer.get_mut();
if let Body::Binary(ref bytes) = body { if let Body::Binary(ref bytes) = body {
buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len()); buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len());
} else { } else {
@@ -146,7 +132,11 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
match body { match body {
Body::Empty => Body::Empty =>
buffer.extend_from_slice(b"\r\ncontent-length: 0\r\n"), if req.method != Method::HEAD {
buffer.extend_from_slice(b"\r\ncontent-length: 0\r\n");
} else {
buffer.extend_from_slice(b"\r\n");
},
Body::Binary(ref bytes) => Body::Binary(ref bytes) =>
helpers::write_content_length(bytes.len(), &mut buffer), helpers::write_content_length(bytes.len(), &mut buffer),
_ => _ =>
@@ -176,14 +166,14 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
if let Body::Binary(bytes) = body { if let Body::Binary(bytes) = body {
self.written = bytes.len() as u64; self.written = bytes.len() as u64;
self.encoder.write(bytes.as_ref())?; self.encoder.write(bytes)?;
} else { } else {
msg.replace_body(body); msg.replace_body(body);
} }
Ok(WriterState::Done) Ok(WriterState::Done)
} }
fn write(&mut self, payload: &[u8]) -> Result<WriterState, io::Error> { fn write(&mut self, payload: Binary) -> io::Result<WriterState> {
self.written += payload.len() as u64; self.written += payload.len() as u64;
if !self.flags.contains(Flags::DISCONNECTED) { if !self.flags.contains(Flags::DISCONNECTED) {
if self.flags.contains(Flags::STARTED) { if self.flags.contains(Flags::STARTED) {
@@ -192,24 +182,24 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
return Ok(WriterState::Done) return Ok(WriterState::Done)
} else { } else {
// might be response to EXCEPT // might be response to EXCEPT
self.encoder.get_mut().extend_from_slice(payload) self.buffer.extend_from_slice(payload.as_ref())
} }
} }
if self.encoder.len() > MAX_WRITE_BUFFER_SIZE { if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
Ok(WriterState::Pause) Ok(WriterState::Pause)
} else { } else {
Ok(WriterState::Done) Ok(WriterState::Done)
} }
} }
fn write_eof(&mut self) -> Result<WriterState, io::Error> { fn write_eof(&mut self) -> io::Result<WriterState> {
self.encoder.write_eof()?; self.encoder.write_eof()?;
if !self.encoder.is_eof() { if !self.encoder.is_eof() {
Err(io::Error::new(io::ErrorKind::Other, Err(io::Error::new(io::ErrorKind::Other,
"Last payload item, but eof is not reached")) "Last payload item, but eof is not reached"))
} else if self.encoder.len() > MAX_WRITE_BUFFER_SIZE { } else if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
Ok(WriterState::Pause) Ok(WriterState::Pause)
} else { } else {
Ok(WriterState::Done) Ok(WriterState::Done)

View File

@@ -7,11 +7,11 @@ use http::{Version, HttpTryFrom, Response};
use http::header::{HeaderValue, CONNECTION, TRANSFER_ENCODING, DATE, CONTENT_LENGTH}; use http::header::{HeaderValue, CONNECTION, TRANSFER_ENCODING, DATE, CONTENT_LENGTH};
use helpers; use helpers;
use body::Body; use body::{Body, Binary};
use helpers::SharedBytes;
use httprequest::HttpMessage; use httprequest::HttpMessage;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use super::encoding::PayloadEncoder; use super::encoding::PayloadEncoder;
use super::shared::SharedBytes;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
const CHUNK_SIZE: usize = 16_384; const CHUNK_SIZE: usize = 16_384;
@@ -52,15 +52,13 @@ impl H2Writer {
} }
} }
fn write_to_stream(&mut self) -> Result<WriterState, io::Error> { fn write_to_stream(&mut self) -> io::Result<WriterState> {
if !self.flags.contains(Flags::STARTED) { if !self.flags.contains(Flags::STARTED) {
return Ok(WriterState::Done) return Ok(WriterState::Done)
} }
if let Some(ref mut stream) = self.stream { if let Some(ref mut stream) = self.stream {
let buffer = self.encoder.get_mut(); if self.buffer.is_empty() {
if buffer.is_empty() {
if self.flags.contains(Flags::EOF) { if self.flags.contains(Flags::EOF) {
let _ = stream.send_data(Bytes::new(), true); let _ = stream.send_data(Bytes::new(), true);
} }
@@ -70,7 +68,7 @@ impl H2Writer {
loop { loop {
match stream.poll_capacity() { match stream.poll_capacity() {
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
if buffer.len() > MAX_WRITE_BUFFER_SIZE { if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
return Ok(WriterState::Pause) return Ok(WriterState::Pause)
} else { } else {
return Ok(WriterState::Done) return Ok(WriterState::Done)
@@ -80,15 +78,15 @@ impl H2Writer {
return Ok(WriterState::Done) return Ok(WriterState::Done)
} }
Ok(Async::Ready(Some(cap))) => { Ok(Async::Ready(Some(cap))) => {
let len = buffer.len(); let len = self.buffer.len();
let bytes = buffer.split_to(cmp::min(cap, len)); let bytes = self.buffer.split_to(cmp::min(cap, len));
let eof = buffer.is_empty() && self.flags.contains(Flags::EOF); let eof = self.buffer.is_empty() && self.flags.contains(Flags::EOF);
self.written += bytes.len() as u64; self.written += bytes.len() as u64;
if let Err(err) = stream.send_data(bytes.freeze(), eof) { if let Err(err) = stream.send_data(bytes.freeze(), eof) {
return Err(io::Error::new(io::ErrorKind::Other, err)) return Err(io::Error::new(io::ErrorKind::Other, err))
} else if !buffer.is_empty() { } else if !self.buffer.is_empty() {
let cap = cmp::min(buffer.len(), CHUNK_SIZE); let cap = cmp::min(self.buffer.len(), CHUNK_SIZE);
stream.reserve_capacity(cap); stream.reserve_capacity(cap);
} else { } else {
return Ok(WriterState::Pause) return Ok(WriterState::Pause)
@@ -110,16 +108,7 @@ impl Writer for H2Writer {
self.written self.written
} }
#[inline] fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse) -> io::Result<WriterState> {
fn flush(&mut self) -> Poll<(), io::Error> {
Ok(Async::Ready(()))
}
fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse)
-> Result<WriterState, io::Error>
{
// trace!("Prepare response with status: {:?}", msg.status());
// prepare response // prepare response
self.flags.insert(Flags::STARTED); self.flags.insert(Flags::STARTED);
self.encoder = PayloadEncoder::new(self.buffer.clone(), req, msg); self.encoder = PayloadEncoder::new(self.buffer.clone(), req, msg);
@@ -172,9 +161,9 @@ impl Writer for H2Writer {
if let Body::Binary(bytes) = body { if let Body::Binary(bytes) = body {
self.flags.insert(Flags::EOF); self.flags.insert(Flags::EOF);
self.written = bytes.len() as u64; self.written = bytes.len() as u64;
self.encoder.write(bytes.as_ref())?; self.encoder.write(bytes)?;
if let Some(ref mut stream) = self.stream { if let Some(ref mut stream) = self.stream {
stream.reserve_capacity(cmp::min(self.encoder.len(), CHUNK_SIZE)); stream.reserve_capacity(cmp::min(self.buffer.len(), CHUNK_SIZE));
} }
Ok(WriterState::Pause) Ok(WriterState::Pause)
} else { } else {
@@ -183,7 +172,7 @@ impl Writer for H2Writer {
} }
} }
fn write(&mut self, payload: &[u8]) -> Result<WriterState, io::Error> { fn write(&mut self, payload: Binary) -> io::Result<WriterState> {
self.written = payload.len() as u64; self.written = payload.len() as u64;
if !self.flags.contains(Flags::DISCONNECTED) { if !self.flags.contains(Flags::DISCONNECTED) {
@@ -192,25 +181,25 @@ impl Writer for H2Writer {
self.encoder.write(payload)?; self.encoder.write(payload)?;
} else { } else {
// might be response for EXCEPT // might be response for EXCEPT
self.encoder.get_mut().extend_from_slice(payload) self.buffer.extend_from_slice(payload.as_ref())
} }
} }
if self.encoder.len() > MAX_WRITE_BUFFER_SIZE { if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
Ok(WriterState::Pause) Ok(WriterState::Pause)
} else { } else {
Ok(WriterState::Done) Ok(WriterState::Done)
} }
} }
fn write_eof(&mut self) -> Result<WriterState, io::Error> { fn write_eof(&mut self) -> io::Result<WriterState> {
self.encoder.write_eof()?; self.encoder.write_eof()?;
self.flags.insert(Flags::EOF); self.flags.insert(Flags::EOF);
if !self.encoder.is_eof() { if !self.encoder.is_eof() {
Err(io::Error::new(io::ErrorKind::Other, Err(io::Error::new(io::ErrorKind::Other,
"Last payload item, but eof is not reached")) "Last payload item, but eof is not reached"))
} else if self.encoder.len() > MAX_WRITE_BUFFER_SIZE { } else if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
Ok(WriterState::Pause) Ok(WriterState::Pause)
} else { } else {
Ok(WriterState::Done) Ok(WriterState::Done)

View File

@@ -15,11 +15,13 @@ mod h2;
mod h1writer; mod h1writer;
mod h2writer; mod h2writer;
mod settings; mod settings;
mod shared;
mod utils; mod utils;
pub use self::srv::HttpServer; pub use self::srv::HttpServer;
pub use self::settings::ServerSettings; pub use self::settings::ServerSettings;
use body::Binary;
use error::Error; use error::Error;
use httprequest::{HttpMessage, HttpRequest}; use httprequest::{HttpMessage, HttpRequest};
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
@@ -54,6 +56,12 @@ pub trait HttpHandler: 'static {
fn handle(&mut self, req: HttpRequest) -> Result<Box<HttpHandlerTask>, HttpRequest>; fn handle(&mut self, req: HttpRequest) -> Result<Box<HttpHandlerTask>, HttpRequest>;
} }
impl HttpHandler for Box<HttpHandler> {
fn handle(&mut self, req: HttpRequest) -> Result<Box<HttpHandlerTask>, HttpRequest> {
self.as_mut().handle(req)
}
}
pub trait HttpHandlerTask { pub trait HttpHandlerTask {
fn poll(&mut self) -> Poll<(), Error>; fn poll(&mut self) -> Poll<(), Error>;
@@ -90,14 +98,11 @@ pub enum WriterState {
pub trait Writer { pub trait Writer {
fn written(&self) -> u64; fn written(&self) -> u64;
fn start(&mut self, req: &mut HttpMessage, resp: &mut HttpResponse) fn start(&mut self, req: &mut HttpMessage, resp: &mut HttpResponse) -> io::Result<WriterState>;
-> Result<WriterState, io::Error>;
fn write(&mut self, payload: &[u8]) -> Result<WriterState, io::Error>; fn write(&mut self, payload: Binary) -> io::Result<WriterState>;
fn write_eof(&mut self) -> Result<WriterState, io::Error>; fn write_eof(&mut self) -> io::Result<WriterState>;
fn flush(&mut self) -> Poll<(), io::Error>;
fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error>; fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error>;
} }

View File

@@ -4,6 +4,7 @@ use std::cell::{Cell, RefCell, RefMut};
use helpers; use helpers;
use super::channel::Node; use super::channel::Node;
use super::shared::{SharedBytes, SharedBytesPool};
/// Various server settings /// Various server settings
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@@ -63,7 +64,7 @@ pub(crate) struct WorkerSettings<H> {
h: RefCell<Vec<H>>, h: RefCell<Vec<H>>,
enabled: bool, enabled: bool,
keep_alive: u64, keep_alive: u64,
bytes: Rc<helpers::SharedBytesPool>, bytes: Rc<SharedBytesPool>,
messages: Rc<helpers::SharedMessagePool>, messages: Rc<helpers::SharedMessagePool>,
channels: Cell<usize>, channels: Cell<usize>,
node: Node<()>, node: Node<()>,
@@ -75,7 +76,7 @@ impl<H> WorkerSettings<H> {
h: RefCell::new(h), h: RefCell::new(h),
enabled: if let Some(ka) = keep_alive { ka > 0 } else { false }, enabled: if let Some(ka) = keep_alive { ka > 0 } else { false },
keep_alive: keep_alive.unwrap_or(0), keep_alive: keep_alive.unwrap_or(0),
bytes: Rc::new(helpers::SharedBytesPool::new()), bytes: Rc::new(SharedBytesPool::new()),
messages: Rc::new(helpers::SharedMessagePool::new()), messages: Rc::new(helpers::SharedMessagePool::new()),
channels: Cell::new(0), channels: Cell::new(0),
node: Node::head(), node: Node::head(),
@@ -102,8 +103,8 @@ impl<H> WorkerSettings<H> {
self.enabled self.enabled
} }
pub fn get_shared_bytes(&self) -> helpers::SharedBytes { pub fn get_shared_bytes(&self) -> SharedBytes {
helpers::SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes)) SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes))
} }
pub fn get_http_message(&self) -> helpers::SharedHttpMessage { pub fn get_http_message(&self) -> helpers::SharedHttpMessage {

120
src/server/shared.rs Normal file
View File

@@ -0,0 +1,120 @@
use std::mem;
use std::cell::RefCell;
use std::rc::Rc;
use std::collections::VecDeque;
use bytes::BytesMut;
use body::Binary;
/// Internal use only! unsafe
#[derive(Debug)]
pub(crate) struct SharedBytesPool(RefCell<VecDeque<Rc<BytesMut>>>);
impl SharedBytesPool {
pub fn new() -> SharedBytesPool {
SharedBytesPool(RefCell::new(VecDeque::with_capacity(128)))
}
pub fn get_bytes(&self) -> Rc<BytesMut> {
if let Some(bytes) = self.0.borrow_mut().pop_front() {
bytes
} else {
Rc::new(BytesMut::new())
}
}
pub fn release_bytes(&self, mut bytes: Rc<BytesMut>) {
let v = &mut self.0.borrow_mut();
if v.len() < 128 {
Rc::get_mut(&mut bytes).unwrap().take();
v.push_front(bytes);
}
}
}
#[derive(Debug)]
pub(crate) struct SharedBytes(
Option<Rc<BytesMut>>, Option<Rc<SharedBytesPool>>);
impl Drop for SharedBytes {
fn drop(&mut self) {
if let Some(ref pool) = self.1 {
if let Some(bytes) = self.0.take() {
if Rc::strong_count(&bytes) == 1 {
pool.release_bytes(bytes);
}
}
}
}
}
impl SharedBytes {
pub fn empty() -> Self {
SharedBytes(None, None)
}
pub fn new(bytes: Rc<BytesMut>, pool: Rc<SharedBytesPool>) -> SharedBytes {
SharedBytes(Some(bytes), Some(pool))
}
#[inline(always)]
#[allow(mutable_transmutes)]
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))]
pub fn get_mut(&self) -> &mut BytesMut {
let r: &BytesMut = self.0.as_ref().unwrap().as_ref();
unsafe{mem::transmute(r)}
}
#[inline]
pub fn len(&self) -> usize {
self.0.as_ref().unwrap().len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.0.as_ref().unwrap().is_empty()
}
#[inline]
pub fn as_ref(&self) -> &[u8] {
self.0.as_ref().unwrap().as_ref()
}
pub fn split_to(&self, n: usize) -> BytesMut {
self.get_mut().split_to(n)
}
pub fn take(&self) -> BytesMut {
self.get_mut().take()
}
#[inline]
pub fn reserve(&self, cnt: usize) {
self.get_mut().reserve(cnt)
}
#[inline]
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
pub fn extend(&self, data: Binary) {
self.get_mut().extend_from_slice(data.as_ref());
}
#[inline]
pub fn extend_from_slice(&self, data: &[u8]) {
self.get_mut().extend_from_slice(data);
}
}
impl Default for SharedBytes {
fn default() -> Self {
SharedBytes(Some(Rc::new(BytesMut::new())), None)
}
}
impl Clone for SharedBytes {
fn clone(&self) -> SharedBytes {
SharedBytes(self.0.clone(), self.1.clone())
}
}

View File

@@ -268,9 +268,9 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
where U: IntoIterator<Item=V> + 'static, where U: IntoIterator<Item=V> + 'static,
V: IntoHttpHandler<Handler=H>, V: IntoHttpHandler<Handler=H>,
{ {
/// Start listening for incomming connections. /// Start listening for incoming connections.
/// ///
/// This method starts number of http handler workers in seperate threads. /// This method starts number of http handler workers in separate threads.
/// For each address this method starts separate thread which does `accept()` in a loop. /// For each address this method starts separate thread which does `accept()` in a loop.
/// ///
/// This methods panics if no socket addresses get bound. /// This methods panics if no socket addresses get bound.
@@ -298,7 +298,7 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
pub fn start(mut self) -> SyncAddress<Self> pub fn start(mut self) -> SyncAddress<Self>
{ {
if self.sockets.is_empty() { if self.sockets.is_empty() {
panic!("HttpServer::bind() has to be called befor start()"); panic!("HttpServer::bind() has to be called before start()");
} else { } else {
let addrs: Vec<(net::SocketAddr, net::TcpListener)> = let addrs: Vec<(net::SocketAddr, net::TcpListener)> =
self.sockets.drain().collect(); self.sockets.drain().collect();
@@ -320,7 +320,7 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
} }
} }
/// Spawn new thread and start listening for incomming connections. /// Spawn new thread and start listening for incoming connections.
/// ///
/// This method spawns new thread and starts new actix system. Other than that it is /// This method spawns new thread and starts new actix system. Other than that it is
/// similar to `start()` method. This method blocks. /// similar to `start()` method. This method blocks.
@@ -359,7 +359,7 @@ impl<H: HttpHandler, U, V> HttpServer<TlsStream<TcpStream>, net::SocketAddr, H,
where U: IntoIterator<Item=V> + 'static, where U: IntoIterator<Item=V> + 'static,
V: IntoHttpHandler<Handler=H>, V: IntoHttpHandler<Handler=H>,
{ {
/// Start listening for incomming tls connections. /// Start listening for incoming tls connections.
pub fn start_tls(mut self, pkcs12: ::Pkcs12) -> io::Result<SyncAddress<Self>> { pub fn start_tls(mut self, pkcs12: ::Pkcs12) -> io::Result<SyncAddress<Self>> {
if self.sockets.is_empty() { if self.sockets.is_empty() {
Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound")) Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound"))
@@ -398,7 +398,7 @@ impl<H: HttpHandler, U, V> HttpServer<SslStream<TcpStream>, net::SocketAddr, H,
where U: IntoIterator<Item=V> + 'static, where U: IntoIterator<Item=V> + 'static,
V: IntoHttpHandler<Handler=H>, V: IntoHttpHandler<Handler=H>,
{ {
/// Start listening for incomming tls connections. /// Start listening for incoming tls connections.
/// ///
/// This method sets alpn protocols to "h2" and "http/1.1" /// This method sets alpn protocols to "h2" and "http/1.1"
pub fn start_ssl(mut self, identity: &ParsedPkcs12) -> io::Result<SyncAddress<Self>> { pub fn start_ssl(mut self, identity: &ParsedPkcs12) -> io::Result<SyncAddress<Self>> {
@@ -443,7 +443,7 @@ impl<T, A, H, U, V> HttpServer<WrapperStream<T>, A, H, U>
U: IntoIterator<Item=V> + 'static, U: IntoIterator<Item=V> + 'static,
V: IntoHttpHandler<Handler=H>, V: IntoHttpHandler<Handler=H>,
{ {
/// Start listening for incomming connections from a stream. /// Start listening for incoming connections from a stream.
/// ///
/// This method uses only one thread for handling incoming connections. /// This method uses only one thread for handling incoming connections.
pub fn start_incoming<S>(mut self, stream: S, secure: bool) -> SyncAddress<Self> pub fn start_incoming<S>(mut self, stream: S, secure: bool) -> SyncAddress<Self>
@@ -663,7 +663,7 @@ fn start_accept_thread(sock: net::TcpListener, addr: net::SocketAddr, backlog: i
} }
} }
// Start listening for incommin commands // Start listening for incoming commands
if let Err(err) = poll.register(&reg, CMD, if let Err(err) = poll.register(&reg, CMD,
mio::Ready::readable(), mio::PollOpt::edge()) { mio::Ready::readable(), mio::PollOpt::edge()) {
panic!("Can not register Registration: {}", err); panic!("Can not register Registration: {}", err);

View File

@@ -29,7 +29,7 @@ use server::{HttpServer, HttpHandler, IntoHttpHandler, ServerSettings};
/// The `TestServer` type. /// The `TestServer` type.
/// ///
/// `TestServer` is very simple test server that simplify process of writing /// `TestServer` is very simple test server that simplify process of writing
/// integrational tests cases for actix web applications. /// integration tests cases for actix web applications.
/// ///
/// # Examples /// # Examples
/// ///
@@ -61,7 +61,7 @@ impl TestServer {
/// Start new test server /// Start new test server
/// ///
/// This methos accepts configuration method. You can add /// This method accepts configuration method. You can add
/// middlewares or set handlers for test application. /// middlewares or set handlers for test application.
pub fn new<F>(config: F) -> Self pub fn new<F>(config: F) -> Self
where F: Sync + Send + 'static + Fn(&mut TestApp<()>), where F: Sync + Send + 'static + Fn(&mut TestApp<()>),
@@ -101,7 +101,7 @@ impl TestServer {
/// Start new test server with custom application state /// Start new test server with custom application state
/// ///
/// This methos accepts state factory and configuration method. /// This method accepts state factory and configuration method.
pub fn with_state<S, FS, F>(state: FS, config: F) -> Self pub fn with_state<S, FS, F>(state: FS, config: F) -> Self
where S: 'static, where S: 'static,
FS: Sync + Send + 'static + Fn() -> S, FS: Sync + Send + 'static + Fn() -> S,
@@ -287,12 +287,12 @@ impl Default for TestRequest<()> {
impl TestRequest<()> { impl TestRequest<()> {
/// Create TestReqeust and set request uri /// Create TestRequest and set request uri
pub fn with_uri(path: &str) -> TestRequest<()> { pub fn with_uri(path: &str) -> TestRequest<()> {
TestRequest::default().uri(path) TestRequest::default().uri(path)
} }
/// Create TestReqeust and set header /// Create TestRequest and set header
pub fn with_header<K, V>(key: K, value: V) -> TestRequest<()> pub fn with_header<K, V>(key: K, value: V) -> TestRequest<()>
where HeaderName: HttpTryFrom<K>, where HeaderName: HttpTryFrom<K>,
HeaderValue: HttpTryFrom<V> HeaderValue: HttpTryFrom<V>

View File

@@ -1,8 +1,8 @@
use std::mem; use std::mem;
use std::collections::VecDeque;
use futures::{Async, Poll}; use futures::{Async, Poll};
use futures::sync::oneshot::Sender; use futures::sync::oneshot::Sender;
use futures::unsync::oneshot; use futures::unsync::oneshot;
use smallvec::SmallVec;
use actix::{Actor, ActorState, ActorContext, AsyncContext, use actix::{Actor, ActorState, ActorContext, AsyncContext,
Address, SyncAddress, Handler, Subscriber, ResponseType, SpawnHandle}; Address, SyncAddress, Handler, Subscriber, ResponseType, SpawnHandle};
@@ -23,7 +23,7 @@ use ws::proto::{OpCode, CloseCode};
pub struct WebsocketContext<A, S=()> where A: Actor<Context=WebsocketContext<A, S>>, pub struct WebsocketContext<A, S=()> where A: Actor<Context=WebsocketContext<A, S>>,
{ {
inner: ContextImpl<A>, inner: ContextImpl<A>,
stream: VecDeque<ContextFrame>, stream: Option<SmallVec<[ContextFrame; 2]>>,
request: HttpRequest<S>, request: HttpRequest<S>,
disconnected: bool, disconnected: bool,
} }
@@ -88,7 +88,7 @@ impl<A, S: 'static> WebsocketContext<A, S> where A: Actor<Context=Self> {
pub fn from_request(req: HttpRequest<S>) -> WebsocketContext<A, S> { pub fn from_request(req: HttpRequest<S>) -> WebsocketContext<A, S> {
WebsocketContext { WebsocketContext {
inner: ContextImpl::new(None), inner: ContextImpl::new(None),
stream: VecDeque::new(), stream: None,
request: req, request: req,
disconnected: false, disconnected: false,
} }
@@ -107,7 +107,7 @@ impl<A, S> WebsocketContext<A, S> where A: Actor<Context=Self> {
#[inline] #[inline]
fn write<B: Into<Binary>>(&mut self, data: B) { fn write<B: Into<Binary>>(&mut self, data: B) {
if !self.disconnected { if !self.disconnected {
self.stream.push_back(ContextFrame::Chunk(Some(data.into()))); self.add_frame(ContextFrame::Chunk(Some(data.into())));
} else { } else {
warn!("Trying to write to disconnected response"); warn!("Trying to write to disconnected response");
} }
@@ -173,7 +173,7 @@ impl<A, S> WebsocketContext<A, S> where A: Actor<Context=Self> {
pub fn drain(&mut self) -> Drain<A> { pub fn drain(&mut self) -> Drain<A> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.inner.modify(); self.inner.modify();
self.stream.push_back(ContextFrame::Drain(tx)); self.add_frame(ContextFrame::Drain(tx));
Drain::new(rx) Drain::new(rx)
} }
@@ -182,6 +182,13 @@ impl<A, S> WebsocketContext<A, S> where A: Actor<Context=Self> {
pub fn connected(&self) -> bool { pub fn connected(&self) -> bool {
!self.disconnected !self.disconnected
} }
fn add_frame(&mut self, frame: ContextFrame) {
if self.stream.is_none() {
self.stream = Some(SmallVec::new());
}
self.stream.as_mut().map(|s| s.push(frame));
}
} }
impl<A, S> WebsocketContext<A, S> where A: Actor<Context=Self> { impl<A, S> WebsocketContext<A, S> where A: Actor<Context=Self> {
@@ -212,7 +219,7 @@ impl<A, S> ActorHttpContext for WebsocketContext<A, S> where A: Actor<Context=Se
self.stop(); self.stop();
} }
fn poll(&mut self) -> Poll<Option<ContextFrame>, Error> { fn poll(&mut self) -> Poll<Option<SmallVec<[ContextFrame;2]>>, Error> {
let ctx: &mut WebsocketContext<A, S> = unsafe { let ctx: &mut WebsocketContext<A, S> = unsafe {
mem::transmute(self as &mut WebsocketContext<A, S>) mem::transmute(self as &mut WebsocketContext<A, S>)
}; };
@@ -225,8 +232,8 @@ impl<A, S> ActorHttpContext for WebsocketContext<A, S> where A: Actor<Context=Se
} }
// frames // frames
if let Some(frame) = self.stream.pop_front() { if let Some(data) = self.stream.take() {
Ok(Async::Ready(Some(frame))) Ok(Async::Ready(Some(data)))
} else if self.inner.alive() { } else if self.inner.alive() {
Ok(Async::NotReady) Ok(Async::NotReady)
} else { } else {

View File

@@ -5,14 +5,7 @@ use bytes::BytesMut;
use body::Binary; use body::Binary;
use ws::proto::{OpCode, CloseCode}; use ws::proto::{OpCode, CloseCode};
use ws::mask::apply_mask;
fn apply_mask(buf: &mut [u8], mask: &[u8; 4]) {
let iter = buf.iter_mut().zip(mask.iter().cycle());
for (byte, &key) in iter {
*byte ^= key
}
}
/// A struct representing a `WebSocket` frame. /// A struct representing a `WebSocket` frame.
#[derive(Debug)] #[derive(Debug)]
@@ -28,7 +21,7 @@ pub(crate) struct Frame {
impl Frame { impl Frame {
/// Desctructe frame /// Destruct frame
pub fn unpack(self) -> (bool, OpCode, Binary) { pub fn unpack(self) -> (bool, OpCode, Binary) {
(self.finished, self.opcode, self.payload) (self.finished, self.opcode, self.payload)
} }

120
src/ws/mask.rs Normal file
View File

@@ -0,0 +1,120 @@
//! This is code from [Tungstenite project](https://github.com/snapview/tungstenite-rs)
use std::cmp::min;
use std::mem::uninitialized;
use std::ptr::copy_nonoverlapping;
/// Mask/unmask a frame.
#[inline]
pub fn apply_mask(buf: &mut [u8], mask: &[u8; 4]) {
apply_mask_fast32(buf, mask)
}
/// A safe unoptimized mask application.
#[inline]
#[allow(dead_code)]
fn apply_mask_fallback(buf: &mut [u8], mask: &[u8; 4]) {
for (i, byte) in buf.iter_mut().enumerate() {
*byte ^= mask[i & 3];
}
}
/// Faster version of `apply_mask()` which operates on 4-byte blocks.
#[inline]
#[allow(dead_code)]
fn apply_mask_fast32(buf: &mut [u8], mask: &[u8; 4]) {
// TODO replace this with read_unaligned() as it stabilizes.
let mask_u32 = unsafe {
let mut m: u32 = uninitialized();
#[allow(trivial_casts)]
copy_nonoverlapping(mask.as_ptr(), &mut m as *mut _ as *mut u8, 4);
m
};
let mut ptr = buf.as_mut_ptr();
let mut len = buf.len();
// Possible first unaligned block.
let head = min(len, (4 - (ptr as usize & 3)) & 3);
let mask_u32 = if head > 0 {
unsafe {
xor_mem(ptr, mask_u32, head);
ptr = ptr.offset(head as isize);
}
len -= head;
if cfg!(target_endian = "big") {
mask_u32.rotate_left(8 * head as u32)
} else {
mask_u32.rotate_right(8 * head as u32)
}
} else {
mask_u32
};
if len > 0 {
debug_assert_eq!(ptr as usize % 4, 0);
}
// Properly aligned middle of the data.
while len > 4 {
unsafe {
*(ptr as *mut u32) ^= mask_u32;
ptr = ptr.offset(4);
len -= 4;
}
}
// Possible last block.
if len > 0 {
unsafe { xor_mem(ptr, mask_u32, len); }
}
}
#[inline]
// TODO: copy_nonoverlapping here compiles to call memcpy. While it is not so inefficient,
// it could be done better. The compiler does not see that len is limited to 3.
unsafe fn xor_mem(ptr: *mut u8, mask: u32, len: usize) {
let mut b: u32 = uninitialized();
#[allow(trivial_casts)]
copy_nonoverlapping(ptr, &mut b as *mut _ as *mut u8, len);
b ^= mask;
#[allow(trivial_casts)]
copy_nonoverlapping(&b as *const _ as *const u8, ptr, len);
}
#[cfg(test)]
mod tests {
use super::{apply_mask_fallback, apply_mask_fast32};
#[test]
fn test_apply_mask() {
let mask = [
0x6d, 0xb6, 0xb2, 0x80,
];
let unmasked = vec![
0xf3, 0x00, 0x01, 0x02, 0x03, 0x80, 0x81, 0x82,
0xff, 0xfe, 0x00, 0x17, 0x74, 0xf9, 0x12, 0x03,
];
// Check masking with proper alignment.
{
let mut masked = unmasked.clone();
apply_mask_fallback(&mut masked, &mask);
let mut masked_fast = unmasked.clone();
apply_mask_fast32(&mut masked_fast, &mask);
assert_eq!(masked, masked_fast);
}
// Check masking without alignment.
{
let mut masked = unmasked.clone();
apply_mask_fallback(&mut masked[1..], &mask);
let mut masked_fast = unmasked.clone();
apply_mask_fast32(&mut masked_fast[1..], &mask);
assert_eq!(masked, masked_fast);
}
}
}

View File

@@ -58,6 +58,7 @@ use httpresponse::{ConnectionType, HttpResponse, HttpResponseBuilder};
mod frame; mod frame;
mod proto; mod proto;
mod context; mod context;
mod mask;
use ws::frame::Frame; use ws::frame::Frame;
use ws::proto::{hash_key, OpCode}; use ws::proto::{hash_key, OpCode};

View File

@@ -152,6 +152,66 @@ fn test_body_br_streaming() {
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref())); assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
} }
#[test]
fn test_head_empty() {
let srv = test::TestServer::new(
|app| app.handler(|_| {
httpcodes::HTTPOk.build()
.content_length(STR.len() as u64).finish()}));
let client = reqwest::Client::new();
let mut res = client.head(&srv.url("/")).send().unwrap();
assert!(res.status().is_success());
let mut bytes = BytesMut::with_capacity(2048).writer();
let len = res.headers()
.get::<reqwest::header::ContentLength>().map(|ct_len| **ct_len).unwrap();
assert_eq!(len, STR.len() as u64);
let _ = res.copy_to(&mut bytes);
let bytes = bytes.into_inner();
assert!(bytes.is_empty());
}
#[test]
fn test_head_binary() {
let srv = test::TestServer::new(
|app| app.handler(|_| {
httpcodes::HTTPOk.build()
.content_encoding(headers::ContentEncoding::Identity)
.content_length(100).body(STR)}));
let client = reqwest::Client::new();
let mut res = client.head(&srv.url("/")).send().unwrap();
assert!(res.status().is_success());
let mut bytes = BytesMut::with_capacity(2048).writer();
let len = res.headers()
.get::<reqwest::header::ContentLength>().map(|ct_len| **ct_len).unwrap();
assert_eq!(len, STR.len() as u64);
let _ = res.copy_to(&mut bytes);
let bytes = bytes.into_inner();
assert!(bytes.is_empty());
}
#[test]
fn test_head_binary2() {
let srv = test::TestServer::new(
|app| app.handler(|_| {
httpcodes::HTTPOk.build()
.content_encoding(headers::ContentEncoding::Identity)
.body(STR)
}));
let client = reqwest::Client::new();
let mut res = client.head(&srv.url("/")).send().unwrap();
assert!(res.status().is_success());
let mut bytes = BytesMut::with_capacity(2048).writer();
let len = res.headers()
.get::<reqwest::header::ContentLength>().map(|ct_len| **ct_len).unwrap();
assert_eq!(len, STR.len() as u64);
let _ = res.copy_to(&mut bytes);
let bytes = bytes.into_inner();
assert!(bytes.is_empty());
}
#[test] #[test]
fn test_body_length() { fn test_body_length() {
let srv = test::TestServer::new( let srv = test::TestServer::new(