mirror of
https://github.com/fafhrd91/actix-web
synced 2025-07-16 06:35:46 +02:00
Compare commits
34 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
2227120ae0 | ||
|
21c8c0371d | ||
|
1914a6a0d8 | ||
|
1cff4619e7 | ||
|
7bb7adf89c | ||
|
f55ff24925 | ||
|
f5f78d79e6 | ||
|
9180625dfd | ||
|
552320bae2 | ||
|
7cf221f767 | ||
|
98931a8623 | ||
|
ae10a89014 | ||
|
71d534dadb | ||
|
867bb1d409 | ||
|
91c44a1cf1 | ||
|
3bc60a8d5d | ||
|
58df8fa4b9 | ||
|
81f92b43e5 | ||
|
e1d9c3803b | ||
|
a7c24aace1 | ||
|
89a89e7b18 | ||
|
3425f7be40 | ||
|
09a6f8a34f | ||
|
7060f298b4 | ||
|
33dbe15760 | ||
|
e95c7dfc29 | ||
|
927a92fcac | ||
|
2b0f3d2a9a | ||
|
93fdb596d4 | ||
|
305666067e | ||
|
b805d87ee7 | ||
|
bc6bb9984f | ||
|
c043fd7912 | ||
|
781282897a |
@@ -46,7 +46,9 @@ script:
|
||||
cargo clean
|
||||
USE_SKEPTIC=1 cargo test --features=alpn
|
||||
else
|
||||
cargo test --features=alpn
|
||||
cargo clean
|
||||
cargo test
|
||||
# --features=alpn
|
||||
fi
|
||||
|
||||
- |
|
||||
|
21
CHANGES.md
21
CHANGES.md
@@ -1,5 +1,26 @@
|
||||
# Changes
|
||||
|
||||
## 0.3.2 (2018-01-21)
|
||||
|
||||
* Fix HEAD requests handling
|
||||
|
||||
* Log request processing errors
|
||||
|
||||
* Always enable content encoding if encoding explicitly selected
|
||||
|
||||
* Allow multiple Applications on a single server with different state #49
|
||||
|
||||
* CORS middleware: allowed_headers is defaulting to None #50
|
||||
|
||||
|
||||
## 0.3.1 (2018-01-13)
|
||||
|
||||
* Fix directory entry path #47
|
||||
|
||||
* Do not enable chunked encoding for HTTP/1.0
|
||||
|
||||
* Allow explicitly disable chunked encoding
|
||||
|
||||
|
||||
## 0.3.0 (2018-01-12)
|
||||
|
||||
|
10
Cargo.toml
10
Cargo.toml
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-web"
|
||||
version = "0.3.0"
|
||||
version = "0.3.2"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix web framework"
|
||||
readme = "README.md"
|
||||
@@ -11,7 +11,8 @@ documentation = "https://docs.rs/actix-web/"
|
||||
categories = ["network-programming", "asynchronous",
|
||||
"web-programming::http-server", "web-programming::websocket"]
|
||||
license = "MIT/Apache-2.0"
|
||||
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
|
||||
exclude = [".gitignore", ".travis.yml", ".cargo/config",
|
||||
"appveyor.yml", "/examples/**"]
|
||||
build = "build.rs"
|
||||
|
||||
[badges]
|
||||
@@ -57,9 +58,6 @@ num_cpus = "1.0"
|
||||
flate2 = "1.0"
|
||||
cookie = { version="0.10", features=["percent-encode", "secure"] }
|
||||
|
||||
# ring nightly compilation bug
|
||||
# cookie = { git="https://github.com/alexcrichton/cookie-rs.git", features=["percent-encode", "secure"] }
|
||||
|
||||
# io
|
||||
mio = "0.6"
|
||||
net2 = "0.2"
|
||||
@@ -83,7 +81,7 @@ version = "0.9"
|
||||
optional = true
|
||||
|
||||
[dev-dependencies]
|
||||
env_logger = "0.4"
|
||||
env_logger = "0.5"
|
||||
reqwest = "0.8"
|
||||
skeptic = "0.13"
|
||||
serde_derive = "1.0"
|
||||
|
@@ -6,6 +6,6 @@ workspace = "../.."
|
||||
|
||||
[dependencies]
|
||||
futures = "*"
|
||||
env_logger = "0.4"
|
||||
env_logger = "0.5"
|
||||
actix = "0.4"
|
||||
actix-web = { path = "../../" }
|
||||
actix-web = { path="../.." }
|
||||
|
@@ -7,6 +7,7 @@ extern crate env_logger;
|
||||
extern crate futures;
|
||||
use futures::Stream;
|
||||
|
||||
use std::{io, env};
|
||||
use actix_web::*;
|
||||
use actix_web::middleware::RequestSession;
|
||||
use futures::future::{FutureResult, result};
|
||||
@@ -56,17 +57,17 @@ fn index(mut req: HttpRequest) -> Result<HttpResponse> {
|
||||
fn p404(req: HttpRequest) -> Result<HttpResponse> {
|
||||
|
||||
// html
|
||||
let html = format!(r#"<!DOCTYPE html><html><head><title>actix - basics</title><link rel="shortcut icon" type="image/x-icon" href="/favicon.ico" /></head>
|
||||
let html = r#"<!DOCTYPE html><html><head><title>actix - basics</title><link rel="shortcut icon" type="image/x-icon" href="/favicon.ico" /></head>
|
||||
<body>
|
||||
<a href="index.html">back to home</a>
|
||||
<h1>404</h1>
|
||||
</body>
|
||||
</html>"#);
|
||||
</html>"#;
|
||||
|
||||
// response
|
||||
Ok(HttpResponse::build(StatusCode::NOT_FOUND)
|
||||
.content_type("text/html; charset=utf-8")
|
||||
.body(&html).unwrap())
|
||||
.body(html).unwrap())
|
||||
}
|
||||
|
||||
|
||||
@@ -92,8 +93,9 @@ fn with_param(req: HttpRequest) -> Result<HttpResponse>
|
||||
}
|
||||
|
||||
fn main() {
|
||||
::std::env::set_var("RUST_LOG", "actix_web=info");
|
||||
let _ = env_logger::init();
|
||||
env::set_var("RUST_LOG", "actix_web=debug");
|
||||
env::set_var("RUST_BACKTRACE", "1");
|
||||
env_logger::init();
|
||||
let sys = actix::System::new("basic-example");
|
||||
|
||||
let addr = HttpServer::new(
|
||||
@@ -121,6 +123,9 @@ fn main() {
|
||||
_ => httpcodes::HTTPNotFound,
|
||||
}
|
||||
}))
|
||||
.resource("/error.html", |r| r.f(|req| {
|
||||
error::ErrorBadRequest(io::Error::new(io::ErrorKind::Other, "test"))
|
||||
}))
|
||||
// static files
|
||||
.handler("/static/", fs::StaticFiles::new("../static/", true))
|
||||
// redirect
|
||||
|
@@ -5,9 +5,9 @@ authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
workspace = "../.."
|
||||
|
||||
[dependencies]
|
||||
env_logger = "0.4"
|
||||
env_logger = "0.5"
|
||||
actix = "0.4"
|
||||
actix-web = { git = "https://github.com/actix/actix-web" }
|
||||
actix-web = { path = "../../" }
|
||||
|
||||
futures = "0.1"
|
||||
uuid = { version = "0.5", features = ["serde", "v4"] }
|
||||
|
@@ -8,7 +8,7 @@ use diesel::prelude::*;
|
||||
use models;
|
||||
use schema;
|
||||
|
||||
/// This is db executor actor. We are going to run 3 of them in parallele.
|
||||
/// This is db executor actor. We are going to run 3 of them in parallel.
|
||||
pub struct DbExecutor(pub SqliteConnection);
|
||||
|
||||
/// This is only message that this actor can handle, but it is easy to extend number of
|
||||
|
@@ -5,6 +5,6 @@ authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
workspace = "../.."
|
||||
|
||||
[dependencies]
|
||||
env_logger = "0.4"
|
||||
env_logger = "0.5"
|
||||
actix = "0.4"
|
||||
actix-web = { path = "../../" }
|
||||
|
@@ -15,4 +15,4 @@ serde_derive = "1.0"
|
||||
json = "*"
|
||||
|
||||
actix = "0.4"
|
||||
actix-web = { git = "https://github.com/actix/actix-web" }
|
||||
actix-web = { path="../../" }
|
||||
|
@@ -12,4 +12,4 @@ path = "src/main.rs"
|
||||
env_logger = "*"
|
||||
futures = "0.1"
|
||||
actix = "0.4"
|
||||
actix-web = { git = "https://github.com/actix/actix-web" }
|
||||
actix-web = { path="../../" }
|
||||
|
@@ -6,6 +6,6 @@ workspace = "../.."
|
||||
|
||||
[dependencies]
|
||||
futures = "*"
|
||||
env_logger = "0.4"
|
||||
env_logger = "0.5"
|
||||
actix = "0.4"
|
||||
actix-web = { git = "https://github.com/actix/actix-web" }
|
||||
actix-web = { path = "../../" }
|
||||
|
@@ -1,5 +1,5 @@
|
||||
#![cfg_attr(feature="cargo-clippy", allow(needless_pass_by_value))]
|
||||
//! There are two level of statfulness in actix-web. Application has state
|
||||
//! There are two level of statefulness in actix-web. Application has state
|
||||
//! that is shared across all handlers within same Application.
|
||||
//! And individual handler can have state.
|
||||
|
||||
@@ -33,7 +33,7 @@ struct MyWebSocket {
|
||||
}
|
||||
|
||||
impl Actor for MyWebSocket {
|
||||
type Context = HttpContext<Self, AppState>;
|
||||
type Context = ws::WebsocketContext<Self, AppState>;
|
||||
}
|
||||
|
||||
impl Handler<ws::Message> for MyWebSocket {
|
||||
@@ -43,9 +43,9 @@ impl Handler<ws::Message> for MyWebSocket {
|
||||
self.counter += 1;
|
||||
println!("WS({}): {:?}", self.counter, msg);
|
||||
match msg {
|
||||
ws::Message::Ping(msg) => ws::WsWriter::pong(ctx, &msg),
|
||||
ws::Message::Text(text) => ws::WsWriter::text(ctx, &text),
|
||||
ws::Message::Binary(bin) => ws::WsWriter::binary(ctx, bin),
|
||||
ws::Message::Ping(msg) => ctx.pong(&msg),
|
||||
ws::Message::Text(text) => ctx.text(&text),
|
||||
ws::Message::Binary(bin) => ctx.binary(bin),
|
||||
ws::Message::Closed | ws::Message::Error => {
|
||||
ctx.stop();
|
||||
}
|
||||
|
@@ -5,7 +5,7 @@ authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
workspace = "../.."
|
||||
|
||||
[dependencies]
|
||||
env_logger = "0.4"
|
||||
env_logger = "0.5"
|
||||
actix = "0.4"
|
||||
actix-web = { git = "https://github.com/actix/actix-web" }
|
||||
actix-web = { path = "../../" }
|
||||
tera = "*"
|
||||
|
@@ -9,6 +9,6 @@ name = "server"
|
||||
path = "src/main.rs"
|
||||
|
||||
[dependencies]
|
||||
env_logger = "0.4"
|
||||
env_logger = "0.5"
|
||||
actix = "^0.4.2"
|
||||
actix-web = { git = "https://github.com/actix/actix-web", features=["alpn"] }
|
||||
actix-web = { path = "../../", features=["alpn"] }
|
||||
|
@@ -9,7 +9,7 @@ use std::io::Read;
|
||||
use actix_web::*;
|
||||
|
||||
|
||||
/// somple handle
|
||||
/// simple handle
|
||||
fn index(req: HttpRequest) -> Result<HttpResponse> {
|
||||
println!("{:?}", req);
|
||||
Ok(httpcodes::HTTPOk
|
||||
|
@@ -26,4 +26,4 @@ serde_json = "1.0"
|
||||
serde_derive = "1.0"
|
||||
|
||||
actix = "^0.4.2"
|
||||
actix-web = { git = "https://github.com/actix/actix-web" }
|
||||
actix-web = { path="../../" }
|
||||
|
@@ -16,8 +16,8 @@ Chat server listens for incoming tcp connections. Server can access several type
|
||||
* `\list` - list all available rooms
|
||||
* `\join name` - join room, if room does not exist, create new one
|
||||
* `\name name` - set session name
|
||||
* `some message` - just string, send messsage to all peers in same room
|
||||
* client has to send heartbeat `Ping` messages, if server does not receive a heartbeat message for 10 seconds connection gets droppped
|
||||
* `some message` - just string, send message to all peers in same room
|
||||
* client has to send heartbeat `Ping` messages, if server does not receive a heartbeat message for 10 seconds connection gets dropped
|
||||
|
||||
To start server use command: `cargo run --bin server`
|
||||
|
||||
|
@@ -16,7 +16,7 @@ use codec::{ChatRequest, ChatResponse, ChatCodec};
|
||||
#[derive(Message)]
|
||||
pub struct Message(pub String);
|
||||
|
||||
/// `ChatSession` actor is responsible for tcp peer communitions.
|
||||
/// `ChatSession` actor is responsible for tcp peer communications.
|
||||
pub struct ChatSession {
|
||||
/// unique session id
|
||||
id: usize,
|
||||
@@ -30,7 +30,7 @@ pub struct ChatSession {
|
||||
|
||||
impl Actor for ChatSession {
|
||||
/// For tcp communication we are going to use `FramedContext`.
|
||||
/// It is convinient wrapper around `Framed` object from `tokio_io`
|
||||
/// It is convenient wrapper around `Framed` object from `tokio_io`
|
||||
type Context = FramedContext<Self>;
|
||||
|
||||
fn started(&mut self, ctx: &mut Self::Context) {
|
||||
@@ -149,7 +149,7 @@ impl ChatSession {
|
||||
}
|
||||
|
||||
|
||||
/// Define tcp server that will accept incomint tcp connection and create
|
||||
/// Define tcp server that will accept incoming tcp connection and create
|
||||
/// chat actors.
|
||||
pub struct TcpServer {
|
||||
chat: SyncAddress<ChatServer>,
|
||||
|
@@ -17,7 +17,7 @@ If you already have rustup installed, run this command to ensure you have the la
|
||||
rustup update
|
||||
```
|
||||
|
||||
Actix web framework requies rust version 1.20 and up.
|
||||
Actix web framework requires rust version 1.20 and up.
|
||||
|
||||
## Running Examples
|
||||
|
||||
|
@@ -1,7 +1,7 @@
|
||||
# Middlewares
|
||||
|
||||
Actix middlewares system allows to add additional behaviour to request/response processing.
|
||||
Middleware can hook into incomnig request process and modify request or halt request
|
||||
Actix middlewares system allows to add additional behavior to request/response processing.
|
||||
Middleware can hook into incoming request process and modify request or halt request
|
||||
processing and return response early. Also it can hook into response processing.
|
||||
|
||||
Typically middlewares involves in following actions:
|
||||
@@ -12,9 +12,9 @@ Typically middlewares involves in following actions:
|
||||
* Access external services (redis, logging, sessions)
|
||||
|
||||
Middlewares are registered for each application and get executed in same order as
|
||||
registraton order. In general, *middleware* is a type that implements
|
||||
registration order. In general, *middleware* is a type that implements
|
||||
[*Middleware trait*](../actix_web/middlewares/trait.Middleware.html). Each method
|
||||
in this trait has default implementation. Each method can return result immidietly
|
||||
in this trait has default implementation. Each method can return result immediately
|
||||
or *future* object.
|
||||
|
||||
Here is example of simple middleware that adds request and response headers:
|
||||
@@ -148,7 +148,7 @@ fn main() {
|
||||
## User sessions
|
||||
|
||||
Actix provides general solution for session management.
|
||||
[*Session storage*](../actix_web/middleware/struct.SessionStorage.html) middleare can be
|
||||
[*Session storage*](../actix_web/middleware/struct.SessionStorage.html) middleware can be
|
||||
use with different backend types to store session data in different backends.
|
||||
By default only cookie session backend is implemented. Other backend implementations
|
||||
could be added later.
|
||||
@@ -162,7 +162,7 @@ You need to pass a random value to the constructor of *CookieSessionBackend*.
|
||||
This is private key for cookie session. When this value is changed, all session data is lost.
|
||||
Note that whatever you write into your session is visible by the user (but not modifiable).
|
||||
|
||||
In general case, you cretate
|
||||
In general case, you create
|
||||
[*Session storage*](../actix_web/middleware/struct.SessionStorage.html) middleware
|
||||
and initializes it with specific backend implementation, like *CookieSessionBackend*.
|
||||
To access session data
|
||||
|
@@ -4,7 +4,7 @@ Actix web automatically upgrades connection to *HTTP/2.0* if possible.
|
||||
|
||||
## Negotiation
|
||||
|
||||
*HTTP/2.0* protocol over tls without prior knowlage requires
|
||||
*HTTP/2.0* protocol over tls without prior knowledge requires
|
||||
[tls alpn](https://tools.ietf.org/html/rfc7301). At the moment only
|
||||
`rust-openssl` has support. Turn on `alpn` feature to enable `alpn` negotiation.
|
||||
With enable `alpn` feature `HttpServer` provides
|
||||
|
@@ -36,8 +36,9 @@ We can send `CreateUser` message to `DbExecutor` actor, and as result we get
|
||||
|
||||
```rust,ignore
|
||||
impl Handler<CreateUser> for DbExecutor {
|
||||
type Result = Result<User, Error>
|
||||
|
||||
fn handle(&mut self, msg: CreateUser, _: &mut Self::Context) -> Response<Self, CreateUser>
|
||||
fn handle(&mut self, msg: CreateUser, _: &mut Self::Context) -> Self::Result
|
||||
{
|
||||
use self::schema::users::dsl::*;
|
||||
|
||||
@@ -59,7 +60,7 @@ impl Handler<CreateUser> for DbExecutor {
|
||||
.load::<models::User>(&self.0)
|
||||
.expect("Error loading person");
|
||||
|
||||
Self::reply(items.pop().unwrap())
|
||||
Ok(items.pop().unwrap())
|
||||
}
|
||||
}
|
||||
```
|
||||
@@ -77,7 +78,7 @@ struct State {
|
||||
fn main() {
|
||||
let sys = actix::System::new("diesel-example");
|
||||
|
||||
// Start 3 parallele db executors
|
||||
// Start 3 parallel db executors
|
||||
let addr = SyncArbiter::start(3, || {
|
||||
DbExecutor(SqliteConnection::establish("test.db").unwrap())
|
||||
});
|
||||
@@ -94,7 +95,7 @@ fn main() {
|
||||
}
|
||||
```
|
||||
|
||||
And finally we can use address in a requst handler. We get message response
|
||||
And finally we can use address in a request handler. We get message response
|
||||
asynchronously, so handler needs to return future object, also `Route::a()` needs to be
|
||||
used for async handler registration.
|
||||
|
||||
|
@@ -2,7 +2,7 @@
|
||||
|
||||
Actix web provides some primitives to build web servers and applications with Rust.
|
||||
It provides routing, middlewares, pre-processing of requests, and post-processing of responses,
|
||||
websocket protcol handling, multipart streams, etc.
|
||||
websocket protocol handling, multipart streams, etc.
|
||||
|
||||
All actix web server is built around `Application` instance.
|
||||
It is used for registering routes for resources, middlewares.
|
||||
@@ -10,9 +10,9 @@ Also it stores application specific state that is shared across all handlers
|
||||
within same application.
|
||||
|
||||
Application acts as namespace for all routes, i.e all routes for specific application
|
||||
has same url path prefix. Application prefix always contains laading "/" slash.
|
||||
has same url path prefix. Application prefix always contains leading "/" slash.
|
||||
If supplied prefix does not contain leading slash, it get inserted.
|
||||
Prefix should consists of valud path segments. i.e for application with prefix `/app`
|
||||
Prefix should consists of value path segments. i.e for application with prefix `/app`
|
||||
any request with following paths `/app`, `/app/` or `/app/test` would match,
|
||||
but path `/application` would not match.
|
||||
|
||||
|
@@ -2,7 +2,7 @@
|
||||
|
||||
[*HttpServer*](../actix_web/struct.HttpServer.html) type is responsible for
|
||||
serving http requests. *HttpServer* accept application factory as a parameter,
|
||||
Application factory must have `Send` + `Sync` bounderies. More about that in
|
||||
Application factory must have `Send` + `Sync` boundaries. More about that in
|
||||
*multi-threading* section. To bind to specific socket address `bind()` must be used.
|
||||
This method could be called multiple times. To start http server one of the *start*
|
||||
methods could be used. `start()` method start simple server, `start_tls()` or `start_ssl()`
|
||||
|
@@ -5,7 +5,7 @@ and [`ResponseError` trait](../actix_web/error/trait.ResponseError.html)
|
||||
for handling handler's errors.
|
||||
Any error that implements `ResponseError` trait can be returned as error value.
|
||||
*Handler* can return *Result* object, actix by default provides
|
||||
`Responder` implemenation for compatible result object. Here is implementation
|
||||
`Responder` implementation for compatible result object. Here is implementation
|
||||
definition:
|
||||
|
||||
```rust,ignore
|
||||
@@ -134,3 +134,18 @@ fn index(req: HttpRequest) -> Result<&'static str> {
|
||||
```
|
||||
|
||||
In this example *BAD REQUEST* response get generated for `MyError` error.
|
||||
|
||||
## Error logging
|
||||
|
||||
Actix logs all errors with `WARN` log level. If log level set to `DEBUG`
|
||||
and `RUST_BACKTRACE` is enabled, backtrace get logged. The Error type uses
|
||||
cause's error backtrace if available, if the underlying failure does not provide
|
||||
a backtrace, a new backtrace is constructed pointing to that conversion point
|
||||
(rather than the origin of the error). This construction only happens if there
|
||||
is no underlying backtrace; if it does have a backtrace no new backtrace is constructed.
|
||||
|
||||
You can enable backtrace and debug logging with following command:
|
||||
|
||||
```
|
||||
>> RUST_BACKTRACE=1 RUST_LOG=actix_web=debug cargo run
|
||||
```
|
||||
|
@@ -2,15 +2,15 @@
|
||||
|
||||
URL dispatch provides a simple way to map URLs to `Handler` code using a simple pattern matching
|
||||
language. *Regex* crate and it's
|
||||
[*RegexSet*](https://doc.rust-lang.org/regex/regex/struct.RegexSet.html) is beeing used for
|
||||
[*RegexSet*](https://doc.rust-lang.org/regex/regex/struct.RegexSet.html) is being used for
|
||||
pattern matching. If one of the patterns matches the path information associated with a request,
|
||||
a particular handler object is invoked. A handler is a specific object that implements
|
||||
`Handler` trait, defined in your application, that receives the request and returns
|
||||
a response object. More informatin is available in [handler section](../qs_4.html).
|
||||
a response object. More information is available in [handler section](../qs_4.html).
|
||||
|
||||
## Resource configuration
|
||||
|
||||
Resource configuraiton is the act of adding a new resource to an application.
|
||||
Resource configuration is the act of adding a new resource to an application.
|
||||
A resource has a name, which acts as an identifier to be used for URL generation.
|
||||
The name also allows developers to add routes to existing resources.
|
||||
A resource also has a pattern, meant to match against the *PATH* portion of a *URL*,
|
||||
@@ -19,7 +19,7 @@ port, e.g., */foo/bar* in the *URL* *http://localhost:8080/foo/bar?q=value*).
|
||||
|
||||
The [Application::resource](../actix_web/struct.Application.html#method.resource) methods
|
||||
add a single resource to application routing table. This method accepts *path pattern*
|
||||
and resource configuration funnction.
|
||||
and resource configuration function.
|
||||
|
||||
```rust
|
||||
# extern crate actix_web;
|
||||
@@ -39,20 +39,20 @@ fn main() {
|
||||
}
|
||||
```
|
||||
|
||||
*Configuraiton function* has following type:
|
||||
*Configuration function* has following type:
|
||||
|
||||
```rust,ignore
|
||||
FnOnce(&mut Resource<_>) -> ()
|
||||
```
|
||||
|
||||
*Configration function* can set name and register specific routes.
|
||||
*Configuration function* can set name and register specific routes.
|
||||
If resource does not contain any route or does not have any matching routes it
|
||||
returns *NOT FOUND* http resources.
|
||||
|
||||
## Configuring a Route
|
||||
|
||||
Resource contains set of routes. Each route in turn has set of predicates and handler.
|
||||
New route could be crearted with `Resource::route()` method which returns reference
|
||||
New route could be created with `Resource::route()` method which returns reference
|
||||
to new *Route* instance. By default *route* does not contain any predicates, so matches
|
||||
all requests and default handler is `HTTPNotFound`.
|
||||
|
||||
@@ -91,17 +91,17 @@ builder-like pattern. Following configuration methods are available:
|
||||
any number of predicates could be registered for each route.
|
||||
|
||||
* [*Route::f()*](../actix_web/struct.Route.html#method.f) method registers handler function
|
||||
for this route. Only one handler could be registered. Usually handler registeration
|
||||
is the last config operation. Handler fanction could be function or closure and has type
|
||||
for this route. Only one handler could be registered. Usually handler registration
|
||||
is the last config operation. Handler function could be function or closure and has type
|
||||
`Fn(HttpRequest<S>) -> R + 'static`
|
||||
|
||||
* [*Route::h()*](../actix_web/struct.Route.html#method.h) method registers handler object
|
||||
that implements `Handler` trait. This is similar to `f()` method, only one handler could
|
||||
be registered. Handler registeration is the last config operation.
|
||||
be registered. Handler registration is the last config operation.
|
||||
|
||||
* [*Route::a()*](../actix_web/struct.Route.html#method.a) method registers asynchandler
|
||||
function for this route. Only one handler could be registered. Handler registeration
|
||||
is the last config operation. Handler fanction could be function or closure and has type
|
||||
* [*Route::a()*](../actix_web/struct.Route.html#method.a) method registers async handler
|
||||
function for this route. Only one handler could be registered. Handler registration
|
||||
is the last config operation. Handler function could be function or closure and has type
|
||||
`Fn(HttpRequest<S>) -> Future<Item = HttpResponse, Error = Error> + 'static`
|
||||
|
||||
## Route matching
|
||||
@@ -112,7 +112,7 @@ against a URL path pattern. `path` represents the path portion of the URL that w
|
||||
The way that *actix* does this is very simple. When a request enters the system,
|
||||
for each resource configuration registration present in the system, actix checks
|
||||
the request's path against the pattern declared. *Regex* crate and it's
|
||||
[*RegexSet*](https://doc.rust-lang.org/regex/regex/struct.RegexSet.html) is beeing used for
|
||||
[*RegexSet*](https://doc.rust-lang.org/regex/regex/struct.RegexSet.html) is being used for
|
||||
pattern matching. If resource could not be found, *default resource* get used as matched
|
||||
resource.
|
||||
|
||||
@@ -516,7 +516,7 @@ Predicates can have access to application's state via `HttpRequest::state()` met
|
||||
Also predicates can store extra information in
|
||||
[requests`s extensions](../actix_web/struct.HttpRequest.html#method.extensions).
|
||||
|
||||
### Modifing predicate values
|
||||
### Modifying predicate values
|
||||
|
||||
You can invert the meaning of any predicate value by wrapping it in a `Not` predicate.
|
||||
For example if you want to return "METHOD NOT ALLOWED" response for all methods
|
||||
|
@@ -4,7 +4,7 @@
|
||||
|
||||
Builder-like patter is used to construct an instance of `HttpResponse`.
|
||||
`HttpResponse` provides several method that returns `HttpResponseBuilder` instance,
|
||||
which is implements various convinience methods that helps build response.
|
||||
which is implements various convenience methods that helps build response.
|
||||
Check [documentation](../actix_web/dev/struct.HttpResponseBuilder.html)
|
||||
for type description. Methods `.body`, `.finish`, `.json` finalizes response creation and
|
||||
returns constructed *HttpResponse* instance. if this methods get called for the same
|
||||
@@ -91,7 +91,7 @@ fn index(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> {
|
||||
# fn main() {}
|
||||
```
|
||||
|
||||
Or you can manually load payload into memory and ther deserialize it.
|
||||
Or you can manually load payload into memory and then deserialize it.
|
||||
Here is simple example. We will deserialize *MyObj* struct. We need to load request
|
||||
body first and then deserialize json into object.
|
||||
|
||||
@@ -200,7 +200,7 @@ fn index(req: HttpRequest) -> Box<Future<...>> {
|
||||
match item {
|
||||
// Handle multipart Field
|
||||
multipart::MultipartItem::Field(field) => {
|
||||
println!("==== FIELD ==== {:?} {:?}", field.heders(), field.content_type());
|
||||
println!("==== FIELD ==== {:?} {:?}", field.headers(), field.content_type());
|
||||
|
||||
Either::A(
|
||||
// Field in turn is a stream of *Bytes* objects
|
||||
@@ -259,7 +259,7 @@ fn index(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> {
|
||||
Actix uses [*Payload*](../actix_web/payload/struct.Payload.html) object as request payload stream.
|
||||
*HttpRequest* provides several methods, which can be used for payload access.
|
||||
At the same time *Payload* implements *Stream* trait, so it could be used with various
|
||||
stream combinators. Also *Payload* provides serveral convinience methods that return
|
||||
stream combinators. Also *Payload* provides several convenience methods that return
|
||||
future object that resolve to Bytes object.
|
||||
|
||||
* *readany()* method returns *Stream* of *Bytes* objects.
|
||||
@@ -283,7 +283,7 @@ use futures::{Future, Stream};
|
||||
|
||||
|
||||
fn index(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> {
|
||||
req.payload_mut()
|
||||
req.payload()
|
||||
.readany()
|
||||
.from_err()
|
||||
.fold((), |_, chunk| {
|
||||
|
@@ -3,7 +3,7 @@
|
||||
Actix supports WebSockets out-of-the-box. It is possible to convert request's `Payload`
|
||||
to a stream of [*ws::Message*](../actix_web/ws/enum.Message.html) with
|
||||
a [*ws::WsStream*](../actix_web/ws/struct.WsStream.html) and then use stream
|
||||
combinators to handle actual messages. But it is simplier to handle websocket communications
|
||||
combinators to handle actual messages. But it is simpler to handle websocket communications
|
||||
with http actor.
|
||||
|
||||
This is example of simple websocket echo server:
|
||||
|
@@ -59,9 +59,11 @@ impl<S: 'static> PipelineHandler<S> for Inner<S> {
|
||||
|
||||
#[cfg(test)]
|
||||
impl<S: 'static> HttpApplication<S> {
|
||||
#[cfg(test)]
|
||||
pub(crate) fn run(&mut self, req: HttpRequest<S>) -> Reply {
|
||||
self.inner.borrow_mut().handle(req)
|
||||
}
|
||||
#[cfg(test)]
|
||||
pub(crate) fn prepare_request(&self, req: HttpRequest) -> HttpRequest<S> {
|
||||
req.with_state(Rc::clone(&self.state), self.router.clone())
|
||||
}
|
||||
@@ -134,7 +136,7 @@ impl<S> Application<S> where S: 'static {
|
||||
/// Create application with specific state. Application can be
|
||||
/// configured with builder-like pattern.
|
||||
///
|
||||
/// State is shared with all reousrces within same application and could be
|
||||
/// State is shared with all resources within same application and could be
|
||||
/// accessed with `HttpRequest::state()` method.
|
||||
pub fn with_state(state: S) -> Application<S> {
|
||||
Application {
|
||||
@@ -154,7 +156,7 @@ impl<S> Application<S> where S: 'static {
|
||||
/// Set application prefix
|
||||
///
|
||||
/// Only requests that matches application's prefix get processed by this application.
|
||||
/// Application prefix always contains laading "/" slash. If supplied prefix
|
||||
/// Application prefix always contains leading "/" slash. If supplied prefix
|
||||
/// does not contain leading slash, it get inserted. Prefix should
|
||||
/// consists valid path segments. i.e for application with
|
||||
/// prefix `/app` any request with following paths `/app`, `/app/` or `/app/test`
|
||||
@@ -356,6 +358,40 @@ impl<S> Application<S> where S: 'static {
|
||||
middlewares: Rc::new(parts.middlewares),
|
||||
}
|
||||
}
|
||||
|
||||
/// Convenience method for creating `Box<HttpHandler>` instance.
|
||||
///
|
||||
/// This method is useful if you need to register several application instances
|
||||
/// with different state.
|
||||
///
|
||||
/// ```rust
|
||||
/// # use std::thread;
|
||||
/// # extern crate actix_web;
|
||||
/// use actix_web::*;
|
||||
///
|
||||
/// struct State1;
|
||||
///
|
||||
/// struct State2;
|
||||
///
|
||||
/// fn main() {
|
||||
/// # thread::spawn(|| {
|
||||
/// HttpServer::new(|| { vec![
|
||||
/// Application::with_state(State1)
|
||||
/// .prefix("/app1")
|
||||
/// .resource("/", |r| r.h(httpcodes::HTTPOk))
|
||||
/// .boxed(),
|
||||
/// Application::with_state(State2)
|
||||
/// .prefix("/app2")
|
||||
/// .resource("/", |r| r.h(httpcodes::HTTPOk))
|
||||
/// .boxed() ]})
|
||||
/// .bind("127.0.0.1:8080").unwrap()
|
||||
/// .run()
|
||||
/// # });
|
||||
/// }
|
||||
/// ```
|
||||
pub fn boxed(mut self) -> Box<HttpHandler> {
|
||||
Box::new(self.finish())
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: 'static> IntoHttpHandler for Application<S> {
|
||||
|
20
src/body.rs
20
src/body.rs
@@ -1,4 +1,4 @@
|
||||
use std::fmt;
|
||||
use std::{fmt, mem};
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
@@ -31,7 +31,7 @@ pub enum Binary {
|
||||
Bytes(Bytes),
|
||||
/// Static slice
|
||||
Slice(&'static [u8]),
|
||||
/// Shared stirng body
|
||||
/// Shared string body
|
||||
SharedString(Rc<String>),
|
||||
/// Shared string body
|
||||
#[doc(hidden)]
|
||||
@@ -122,6 +122,22 @@ impl Binary {
|
||||
pub fn from_slice(s: &[u8]) -> Binary {
|
||||
Binary::Bytes(Bytes::from(s))
|
||||
}
|
||||
|
||||
/// Convert Binary to a Bytes instance
|
||||
pub fn take(&mut self) -> Bytes {
|
||||
mem::replace(self, Binary::Slice(b"")).into()
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for Binary {
|
||||
fn clone(&self) -> Binary {
|
||||
match *self {
|
||||
Binary::Bytes(ref bytes) => Binary::Bytes(bytes.clone()),
|
||||
Binary::Slice(slice) => Binary::Bytes(Bytes::from(slice)),
|
||||
Binary::SharedString(ref s) => Binary::Bytes(Bytes::from(s.as_str())),
|
||||
Binary::ArcSharedString(ref s) => Binary::Bytes(Bytes::from(s.as_str())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<Bytes> for Binary {
|
||||
|
@@ -1,9 +1,9 @@
|
||||
use std;
|
||||
use std::marker::PhantomData;
|
||||
use std::collections::VecDeque;
|
||||
use futures::{Async, Future, Poll};
|
||||
use futures::sync::oneshot::Sender;
|
||||
use futures::unsync::oneshot;
|
||||
use smallvec::SmallVec;
|
||||
|
||||
use actix::{Actor, ActorState, ActorContext, AsyncContext,
|
||||
Address, SyncAddress, Handler, Subscriber, ResponseType, SpawnHandle};
|
||||
@@ -18,7 +18,7 @@ use httprequest::HttpRequest;
|
||||
|
||||
pub trait ActorHttpContext: 'static {
|
||||
fn disconnected(&mut self);
|
||||
fn poll(&mut self) -> Poll<Option<Frame>, Error>;
|
||||
fn poll(&mut self) -> Poll<Option<SmallVec<[Frame; 2]>>, Error>;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -31,7 +31,7 @@ pub enum Frame {
|
||||
pub struct HttpContext<A, S=()> where A: Actor<Context=HttpContext<A, S>>,
|
||||
{
|
||||
inner: ContextImpl<A>,
|
||||
stream: VecDeque<Frame>,
|
||||
stream: Option<SmallVec<[Frame; 2]>>,
|
||||
request: HttpRequest<S>,
|
||||
disconnected: bool,
|
||||
}
|
||||
@@ -91,7 +91,7 @@ impl<A, S: 'static> HttpContext<A, S> where A: Actor<Context=Self> {
|
||||
pub fn from_request(req: HttpRequest<S>) -> HttpContext<A, S> {
|
||||
HttpContext {
|
||||
inner: ContextImpl::new(None),
|
||||
stream: VecDeque::new(),
|
||||
stream: None,
|
||||
request: req,
|
||||
disconnected: false,
|
||||
}
|
||||
@@ -121,23 +121,23 @@ impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
|
||||
#[inline]
|
||||
pub fn write<B: Into<Binary>>(&mut self, data: B) {
|
||||
if !self.disconnected {
|
||||
self.stream.push_back(Frame::Chunk(Some(data.into())));
|
||||
self.add_frame(Frame::Chunk(Some(data.into())));
|
||||
} else {
|
||||
warn!("Trying to write to disconnected response");
|
||||
}
|
||||
}
|
||||
|
||||
/// Indicate end of streamimng payload. Also this method calls `Self::close`.
|
||||
/// Indicate end of streaming payload. Also this method calls `Self::close`.
|
||||
#[inline]
|
||||
pub fn write_eof(&mut self) {
|
||||
self.stream.push_back(Frame::Chunk(None));
|
||||
self.add_frame(Frame::Chunk(None));
|
||||
}
|
||||
|
||||
/// Returns drain future
|
||||
pub fn drain(&mut self) -> Drain<A> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.inner.modify();
|
||||
self.stream.push_back(Frame::Drain(tx));
|
||||
self.add_frame(Frame::Drain(tx));
|
||||
Drain::new(rx)
|
||||
}
|
||||
|
||||
@@ -146,6 +146,14 @@ impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
|
||||
pub fn connected(&self) -> bool {
|
||||
!self.disconnected
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn add_frame(&mut self, frame: Frame) {
|
||||
if self.stream.is_none() {
|
||||
self.stream = Some(SmallVec::new());
|
||||
}
|
||||
self.stream.as_mut().map(|s| s.push(frame));
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
|
||||
@@ -176,7 +184,7 @@ impl<A, S> ActorHttpContext for HttpContext<A, S> where A: Actor<Context=Self>,
|
||||
self.stop();
|
||||
}
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Frame>, Error> {
|
||||
fn poll(&mut self) -> Poll<Option<SmallVec<[Frame; 2]>>, Error> {
|
||||
let ctx: &mut HttpContext<A, S> = unsafe {
|
||||
std::mem::transmute(self as &mut HttpContext<A, S>)
|
||||
};
|
||||
@@ -189,8 +197,8 @@ impl<A, S> ActorHttpContext for HttpContext<A, S> where A: Actor<Context=Self>,
|
||||
}
|
||||
|
||||
// frames
|
||||
if let Some(frame) = self.stream.pop_front() {
|
||||
Ok(Async::Ready(Some(frame)))
|
||||
if let Some(data) = self.stream.take() {
|
||||
Ok(Async::Ready(Some(data)))
|
||||
} else if self.inner.alive() {
|
||||
Ok(Async::NotReady)
|
||||
} else {
|
||||
|
220
src/error.rs
220
src/error.rs
@@ -9,8 +9,8 @@ use std::error::Error as StdError;
|
||||
|
||||
use cookie;
|
||||
use httparse;
|
||||
use failure::Fail;
|
||||
use futures::Canceled;
|
||||
use failure::{Fail, Backtrace};
|
||||
use http2::Error as Http2Error;
|
||||
use http::{header, StatusCode, Error as HttpError};
|
||||
use http::uri::InvalidUriBytes;
|
||||
@@ -22,6 +22,8 @@ use url::ParseError as UrlParseError;
|
||||
pub use cookie::{ParseError as CookieParseError};
|
||||
|
||||
use body::Body;
|
||||
use handler::Responder;
|
||||
use httprequest::HttpRequest;
|
||||
use httpresponse::HttpResponse;
|
||||
use httpcodes::{HTTPBadRequest, HTTPMethodNotAllowed, HTTPExpectationFailed};
|
||||
|
||||
@@ -33,9 +35,9 @@ use httpcodes::{HTTPBadRequest, HTTPMethodNotAllowed, HTTPExpectationFailed};
|
||||
pub type Result<T, E=Error> = result::Result<T, E>;
|
||||
|
||||
/// General purpose actix web error
|
||||
#[derive(Fail, Debug)]
|
||||
pub struct Error {
|
||||
cause: Box<ResponseError>,
|
||||
backtrace: Option<Backtrace>,
|
||||
}
|
||||
|
||||
impl Error {
|
||||
@@ -64,6 +66,16 @@ impl fmt::Display for Error {
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Error {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
if let Some(bt) = self.cause.backtrace() {
|
||||
write!(f, "{:?}\n\n{:?}", &self.cause, bt)
|
||||
} else {
|
||||
write!(f, "{:?}\n\n{:?}", &self.cause, self.backtrace.as_ref().unwrap())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// `HttpResponse` for `Error`
|
||||
impl From<Error> for HttpResponse {
|
||||
fn from(err: Error) -> Self {
|
||||
@@ -74,7 +86,12 @@ impl From<Error> for HttpResponse {
|
||||
/// `Error` for any error that implements `ResponseError`
|
||||
impl<T: ResponseError> From<T> for Error {
|
||||
fn from(err: T) -> Error {
|
||||
Error { cause: Box::new(err) }
|
||||
let backtrace = if err.backtrace().is_none() {
|
||||
Some(Backtrace::new())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Error { cause: Box::new(err), backtrace: backtrace }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -320,7 +337,7 @@ pub enum WsHandshakeError {
|
||||
/// Only get method is allowed
|
||||
#[fail(display="Method not allowed")]
|
||||
GetMethodRequired,
|
||||
/// Ugrade header if not set to websocket
|
||||
/// Upgrade header if not set to websocket
|
||||
#[fail(display="Websocket upgrade is expected")]
|
||||
NoWebsocketUpgrade,
|
||||
/// Connection header is not set to upgrade
|
||||
@@ -329,7 +346,7 @@ pub enum WsHandshakeError {
|
||||
/// Websocket version header is not set
|
||||
#[fail(display="Websocket version header is required")]
|
||||
NoVersionHeader,
|
||||
/// Unsupported websockt version
|
||||
/// Unsupported websocket version
|
||||
#[fail(display="Unsupported version")]
|
||||
UnsupportedVersion,
|
||||
/// Websocket key is not set or wrong
|
||||
@@ -478,39 +495,10 @@ impl From<UrlParseError> for UrlGenerationError {
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! ERROR_WRAP {
|
||||
($type:ty, $status:expr) => {
|
||||
unsafe impl<T> Sync for $type {}
|
||||
unsafe impl<T> Send for $type {}
|
||||
|
||||
impl<T> $type {
|
||||
pub fn cause(&self) -> &T {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: fmt::Debug + 'static> Fail for $type {}
|
||||
impl<T: fmt::Debug + 'static> fmt::Display for $type {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{:?}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> ResponseError for $type
|
||||
where T: Send + Sync + fmt::Debug + 'static,
|
||||
{
|
||||
fn error_response(&self) -> HttpResponse {
|
||||
HttpResponse::new($status, Body::Empty)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper type that can wrap any error and generate *BAD REQUEST* response.
|
||||
/// Helper type that can wrap any error and generate custom response.
|
||||
///
|
||||
/// In following example any `io::Error` will be converted into "BAD REQUEST" response
|
||||
/// as oposite to *INNTERNAL SERVER ERROR* which is defined by default.
|
||||
/// as opposite to *INNTERNAL SERVER ERROR* which is defined by default.
|
||||
///
|
||||
/// ```rust
|
||||
/// # extern crate actix_web;
|
||||
@@ -523,59 +511,133 @@ macro_rules! ERROR_WRAP {
|
||||
/// }
|
||||
/// # fn main() {}
|
||||
/// ```
|
||||
#[derive(Debug)]
|
||||
pub struct ErrorBadRequest<T>(pub T);
|
||||
ERROR_WRAP!(ErrorBadRequest<T>, StatusCode::BAD_REQUEST);
|
||||
pub struct InternalError<T> {
|
||||
cause: T,
|
||||
status: StatusCode,
|
||||
backtrace: Backtrace,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Helper type that can wrap any error and generate *UNAUTHORIZED* response.
|
||||
pub struct ErrorUnauthorized<T>(pub T);
|
||||
ERROR_WRAP!(ErrorUnauthorized<T>, StatusCode::UNAUTHORIZED);
|
||||
unsafe impl<T> Sync for InternalError<T> {}
|
||||
unsafe impl<T> Send for InternalError<T> {}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Helper type that can wrap any error and generate *FORBIDDEN* response.
|
||||
pub struct ErrorForbidden<T>(pub T);
|
||||
ERROR_WRAP!(ErrorForbidden<T>, StatusCode::FORBIDDEN);
|
||||
impl<T> InternalError<T> {
|
||||
pub fn new(err: T, status: StatusCode) -> Self {
|
||||
InternalError {
|
||||
cause: err,
|
||||
status: status,
|
||||
backtrace: Backtrace::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Helper type that can wrap any error and generate *NOT FOUND* response.
|
||||
pub struct ErrorNotFound<T>(pub T);
|
||||
ERROR_WRAP!(ErrorNotFound<T>, StatusCode::NOT_FOUND);
|
||||
impl<T> Fail for InternalError<T>
|
||||
where T: Send + Sync + fmt::Debug + 'static
|
||||
{
|
||||
fn backtrace(&self) -> Option<&Backtrace> {
|
||||
Some(&self.backtrace)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Helper type that can wrap any error and generate *METHOD NOT ALLOWED* response.
|
||||
pub struct ErrorMethodNotAllowed<T>(pub T);
|
||||
ERROR_WRAP!(ErrorMethodNotAllowed<T>, StatusCode::METHOD_NOT_ALLOWED);
|
||||
impl<T> fmt::Debug for InternalError<T>
|
||||
where T: Send + Sync + fmt::Debug + 'static
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
fmt::Debug::fmt(&self.cause, f)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Helper type that can wrap any error and generate *REQUEST TIMEOUT* response.
|
||||
pub struct ErrorRequestTimeout<T>(pub T);
|
||||
ERROR_WRAP!(ErrorRequestTimeout<T>, StatusCode::REQUEST_TIMEOUT);
|
||||
impl<T> fmt::Display for InternalError<T>
|
||||
where T: Send + Sync + fmt::Debug + 'static
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
fmt::Debug::fmt(&self.cause, f)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Helper type that can wrap any error and generate *CONFLICT* response.
|
||||
pub struct ErrorConflict<T>(pub T);
|
||||
ERROR_WRAP!(ErrorConflict<T>, StatusCode::CONFLICT);
|
||||
impl<T> ResponseError for InternalError<T>
|
||||
where T: Send + Sync + fmt::Debug + 'static
|
||||
{
|
||||
fn error_response(&self) -> HttpResponse {
|
||||
HttpResponse::new(self.status, Body::Empty)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Helper type that can wrap any error and generate *GONE* response.
|
||||
pub struct ErrorGone<T>(pub T);
|
||||
ERROR_WRAP!(ErrorGone<T>, StatusCode::GONE);
|
||||
impl<T> Responder for InternalError<T>
|
||||
where T: Send + Sync + fmt::Debug + 'static
|
||||
{
|
||||
type Item = HttpResponse;
|
||||
type Error = Error;
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Helper type that can wrap any error and generate *PRECONDITION FAILED* response.
|
||||
pub struct ErrorPreconditionFailed<T>(pub T);
|
||||
ERROR_WRAP!(ErrorPreconditionFailed<T>, StatusCode::PRECONDITION_FAILED);
|
||||
fn respond_to(self, _: HttpRequest) -> Result<HttpResponse, Error> {
|
||||
Err(self.into())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Helper type that can wrap any error and generate *EXPECTATION FAILED* response.
|
||||
pub struct ErrorExpectationFailed<T>(pub T);
|
||||
ERROR_WRAP!(ErrorExpectationFailed<T>, StatusCode::EXPECTATION_FAILED);
|
||||
/// Helper function that creates wrapper of any error and generate *BAD REQUEST* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorBadRequest<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::BAD_REQUEST)
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Helper type that can wrap any error and generate *INTERNAL SERVER ERROR* response.
|
||||
pub struct ErrorInternalServerError<T>(pub T);
|
||||
ERROR_WRAP!(ErrorInternalServerError<T>, StatusCode::INTERNAL_SERVER_ERROR);
|
||||
/// Helper function that creates wrapper of any error and generate *UNAUTHORIZED* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorUnauthorized<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::UNAUTHORIZED)
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *FORBIDDEN* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorForbidden<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::FORBIDDEN)
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *NOT FOUND* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorNotFound<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::NOT_FOUND)
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *METHOD NOT ALLOWED* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorMethodNotAllowed<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::METHOD_NOT_ALLOWED)
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *REQUEST TIMEOUT* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorRequestTimeout<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::REQUEST_TIMEOUT)
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *CONFLICT* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorConflict<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::CONFLICT)
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *GONE* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorGone<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::GONE)
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *PRECONDITION FAILED* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorPreconditionFailed<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::PRECONDITION_FAILED)
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *EXPECTATION FAILED* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorExpectationFailed<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::EXPECTATION_FAILED)
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *INTERNAL SERVER ERROR* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorInternalServerError<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
@@ -9,8 +9,10 @@ use std::path::{Path, PathBuf};
|
||||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
use mime_guess::get_mime_type;
|
||||
|
||||
use param::FromParam;
|
||||
use handler::{Handler, Responder};
|
||||
use headers::ContentEncoding;
|
||||
use httprequest::HttpRequest;
|
||||
use httpresponse::HttpResponse;
|
||||
use httpcodes::HTTPOk;
|
||||
@@ -83,7 +85,6 @@ impl Responder for NamedFile {
|
||||
|
||||
fn respond_to(mut self, _: HttpRequest) -> Result<HttpResponse, io::Error> {
|
||||
let mut resp = HTTPOk.build();
|
||||
use headers::ContentEncoding;
|
||||
resp.content_encoding(ContentEncoding::Identity);
|
||||
if let Some(ext) = self.path().extension() {
|
||||
let mime = get_mime_type(&ext.to_string_lossy());
|
||||
@@ -138,7 +139,7 @@ impl Responder for Directory {
|
||||
for entry in self.path.read_dir()? {
|
||||
if self.can_list(&entry) {
|
||||
let entry = entry.unwrap();
|
||||
let p = match entry.path().strip_prefix(&self.base) {
|
||||
let p = match entry.path().strip_prefix(&self.path) {
|
||||
Ok(p) => base.join(p),
|
||||
Err(_) => continue
|
||||
};
|
||||
|
@@ -9,7 +9,7 @@ use error::Error;
|
||||
use httprequest::HttpRequest;
|
||||
use httpresponse::HttpResponse;
|
||||
|
||||
/// Trait defines object that could be regestered as route handler
|
||||
/// Trait defines object that could be registered as route handler
|
||||
#[allow(unused_variables)]
|
||||
pub trait Handler<S>: 'static {
|
||||
|
||||
@@ -35,7 +35,7 @@ pub trait Responder {
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
/// Convinience trait that convert `Future` object into `Boxed` future
|
||||
/// Convenience trait that convert `Future` object into `Boxed` future
|
||||
pub trait AsyncResponder<I, E>: Sized {
|
||||
fn responder(self) -> Box<Future<Item=I, Error=E>>;
|
||||
}
|
||||
@@ -193,7 +193,7 @@ impl<I, E> Responder for Box<Future<Item=I, Error=E>>
|
||||
}
|
||||
}
|
||||
|
||||
/// Trait defines object that could be regestered as resource route
|
||||
/// Trait defines object that could be registered as resource route
|
||||
pub(crate) trait RouteHandler<S>: 'static {
|
||||
fn handle(&mut self, req: HttpRequest<S>) -> Reply;
|
||||
}
|
||||
@@ -341,7 +341,7 @@ impl Default for NormalizePath {
|
||||
}
|
||||
|
||||
impl NormalizePath {
|
||||
/// Create new `NoramlizePath` instance
|
||||
/// Create new `NormalizePath` instance
|
||||
pub fn new(append: bool, merge: bool, redirect: StatusCode) -> NormalizePath {
|
||||
NormalizePath {
|
||||
append: append,
|
||||
|
@@ -66,84 +66,6 @@ impl fmt::Write for CachedDate {
|
||||
}
|
||||
}
|
||||
|
||||
/// Internal use only! unsafe
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct SharedBytesPool(RefCell<VecDeque<Rc<BytesMut>>>);
|
||||
|
||||
impl SharedBytesPool {
|
||||
pub fn new() -> SharedBytesPool {
|
||||
SharedBytesPool(RefCell::new(VecDeque::with_capacity(128)))
|
||||
}
|
||||
|
||||
pub fn get_bytes(&self) -> Rc<BytesMut> {
|
||||
if let Some(bytes) = self.0.borrow_mut().pop_front() {
|
||||
bytes
|
||||
} else {
|
||||
Rc::new(BytesMut::new())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn release_bytes(&self, mut bytes: Rc<BytesMut>) {
|
||||
let v = &mut self.0.borrow_mut();
|
||||
if v.len() < 128 {
|
||||
Rc::get_mut(&mut bytes).unwrap().take();
|
||||
v.push_front(bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct SharedBytes(
|
||||
Option<Rc<BytesMut>>, Option<Rc<SharedBytesPool>>);
|
||||
|
||||
impl Drop for SharedBytes {
|
||||
fn drop(&mut self) {
|
||||
if let Some(ref pool) = self.1 {
|
||||
if let Some(bytes) = self.0.take() {
|
||||
if Rc::strong_count(&bytes) == 1 {
|
||||
pool.release_bytes(bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SharedBytes {
|
||||
|
||||
pub fn empty() -> Self {
|
||||
SharedBytes(None, None)
|
||||
}
|
||||
|
||||
pub fn new(bytes: Rc<BytesMut>, pool: Rc<SharedBytesPool>) -> SharedBytes {
|
||||
SharedBytes(Some(bytes), Some(pool))
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
#[allow(mutable_transmutes)]
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))]
|
||||
pub fn get_mut(&self) -> &mut BytesMut {
|
||||
let r: &BytesMut = self.0.as_ref().unwrap().as_ref();
|
||||
unsafe{mem::transmute(r)}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn get_ref(&self) -> &BytesMut {
|
||||
self.0.as_ref().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for SharedBytes {
|
||||
fn default() -> Self {
|
||||
SharedBytes(Some(Rc::new(BytesMut::new())), None)
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for SharedBytes {
|
||||
fn clone(&self) -> SharedBytes {
|
||||
SharedBytes(self.0.clone(), self.1.clone())
|
||||
}
|
||||
}
|
||||
|
||||
/// Internal use only! unsafe
|
||||
pub(crate) struct SharedMessagePool(RefCell<VecDeque<Rc<HttpMessage>>>);
|
||||
|
||||
|
@@ -222,7 +222,7 @@ impl<S> HttpRequest<S> {
|
||||
self.uri().path()
|
||||
}
|
||||
|
||||
/// Get *ConnectionInfo* for currect request.
|
||||
/// Get *ConnectionInfo* for correct request.
|
||||
pub fn connection_info(&self) -> &ConnectionInfo {
|
||||
if self.as_ref().info.is_none() {
|
||||
let info: ConnectionInfo<'static> = unsafe{
|
||||
@@ -278,7 +278,7 @@ impl<S> HttpRequest<S> {
|
||||
|
||||
/// Peer socket address
|
||||
///
|
||||
/// Peer address is actuall socket address, if proxy is used in front of
|
||||
/// Peer address is actual socket address, if proxy is used in front of
|
||||
/// actix http server, then peer address would be address of this proxy.
|
||||
///
|
||||
/// To get client connection information `connection_info()` method should be used.
|
||||
|
@@ -158,14 +158,14 @@ impl HttpResponse {
|
||||
|
||||
/// is chunked encoding enabled
|
||||
#[inline]
|
||||
pub fn chunked(&self) -> bool {
|
||||
pub fn chunked(&self) -> Option<bool> {
|
||||
self.get_ref().chunked
|
||||
}
|
||||
|
||||
/// Content encoding
|
||||
#[inline]
|
||||
pub fn content_encoding(&self) -> &ContentEncoding {
|
||||
&self.get_ref().encoding
|
||||
pub fn content_encoding(&self) -> ContentEncoding {
|
||||
self.get_ref().encoding
|
||||
}
|
||||
|
||||
/// Set content encoding
|
||||
@@ -329,7 +329,16 @@ impl HttpResponseBuilder {
|
||||
#[inline]
|
||||
pub fn chunked(&mut self) -> &mut Self {
|
||||
if let Some(parts) = parts(&mut self.response, &self.err) {
|
||||
parts.chunked = true;
|
||||
parts.chunked = Some(true);
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Force disable chunked encoding
|
||||
#[inline]
|
||||
pub fn no_chunking(&mut self) -> &mut Self {
|
||||
if let Some(parts) = parts(&mut self.response, &self.err) {
|
||||
parts.chunked = Some(false);
|
||||
}
|
||||
self
|
||||
}
|
||||
@@ -414,8 +423,8 @@ impl HttpResponseBuilder {
|
||||
}
|
||||
|
||||
/// This method calls provided closure with builder reference if value is Some.
|
||||
pub fn if_some<T, F>(&mut self, value: Option<&T>, f: F) -> &mut Self
|
||||
where F: FnOnce(&T, &mut HttpResponseBuilder)
|
||||
pub fn if_some<T, F>(&mut self, value: Option<T>, f: F) -> &mut Self
|
||||
where F: FnOnce(T, &mut HttpResponseBuilder)
|
||||
{
|
||||
if let Some(val) = value {
|
||||
f(val, self);
|
||||
@@ -641,7 +650,7 @@ struct InnerHttpResponse {
|
||||
status: StatusCode,
|
||||
reason: Option<&'static str>,
|
||||
body: Body,
|
||||
chunked: bool,
|
||||
chunked: Option<bool>,
|
||||
encoding: ContentEncoding,
|
||||
connection_type: Option<ConnectionType>,
|
||||
response_size: u64,
|
||||
@@ -658,7 +667,7 @@ impl InnerHttpResponse {
|
||||
status: status,
|
||||
reason: None,
|
||||
body: body,
|
||||
chunked: false,
|
||||
chunked: None,
|
||||
encoding: ContentEncoding::Auto,
|
||||
connection_type: None,
|
||||
response_size: 0,
|
||||
@@ -709,7 +718,7 @@ impl Pool {
|
||||
if v.len() < 128 {
|
||||
inner.headers.clear();
|
||||
inner.version = None;
|
||||
inner.chunked = false;
|
||||
inner.chunked = None;
|
||||
inner.reason = None;
|
||||
inner.encoding = ContentEncoding::Auto;
|
||||
inner.connection_type = None;
|
||||
@@ -803,11 +812,11 @@ mod tests {
|
||||
#[test]
|
||||
fn test_content_encoding() {
|
||||
let resp = HttpResponse::build(StatusCode::OK).finish().unwrap();
|
||||
assert_eq!(*resp.content_encoding(), ContentEncoding::Auto);
|
||||
assert_eq!(resp.content_encoding(), ContentEncoding::Auto);
|
||||
|
||||
let resp = HttpResponse::build(StatusCode::OK)
|
||||
.content_encoding(ContentEncoding::Br).finish().unwrap();
|
||||
assert_eq!(*resp.content_encoding(), ContentEncoding::Br);
|
||||
assert_eq!(resp.content_encoding(), ContentEncoding::Br);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@@ -100,7 +100,7 @@ pub enum CorsBuilderError {
|
||||
ParseError(http::Error),
|
||||
/// Credentials are allowed, but the Origin is set to "*". This is not allowed by W3C
|
||||
///
|
||||
/// This is a misconfiguration. Check the docuemntation for `Cors`.
|
||||
/// This is a misconfiguration. Check the documentation for `Cors`.
|
||||
#[fail(display="Credentials are allowed, but the Origin is set to \"*\"")]
|
||||
CredentialsWithWildcardOrigin,
|
||||
}
|
||||
@@ -214,7 +214,7 @@ impl Cors {
|
||||
/// This method register cors middleware with resource and
|
||||
/// adds route for *OPTIONS* preflight requests.
|
||||
///
|
||||
/// It is possible to register *Cors* middlware with `Resource::middleware()`
|
||||
/// It is possible to register *Cors* middleware with `Resource::middleware()`
|
||||
/// method, but in that case *Cors* middleware wont be able to handle *OPTIONS*
|
||||
/// requests.
|
||||
pub fn register<S: 'static>(self, resource: &mut Resource<S>) {
|
||||
@@ -295,16 +295,23 @@ impl<S> Middleware<S> for Cors {
|
||||
self.validate_allowed_method(req)?;
|
||||
self.validate_allowed_headers(req)?;
|
||||
|
||||
// allowed headers
|
||||
let headers = if let Some(headers) = self.headers.as_ref() {
|
||||
Some(HeaderValue::try_from(&headers.iter().fold(
|
||||
String::new(), |s, v| s + "," + v.as_str()).as_str()[1..]).unwrap())
|
||||
} else if let Some(hdr) = req.headers().get(header::ACCESS_CONTROL_REQUEST_HEADERS) {
|
||||
Some(hdr.clone())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(Started::Response(
|
||||
HTTPOk.build()
|
||||
.if_some(self.max_age.as_ref(), |max_age, resp| {
|
||||
let _ = resp.header(
|
||||
header::ACCESS_CONTROL_MAX_AGE, format!("{}", max_age).as_str());})
|
||||
.if_some(self.headers.as_ref(), |headers, resp| {
|
||||
let _ = resp.header(
|
||||
header::ACCESS_CONTROL_ALLOW_HEADERS,
|
||||
&headers.iter().fold(
|
||||
String::new(), |s, v| s + "," + v.as_str()).as_str()[1..]);})
|
||||
.if_some(headers, |headers, resp| {
|
||||
let _ = resp.header(header::ACCESS_CONTROL_ALLOW_HEADERS, headers); })
|
||||
.if_true(self.origins.is_all(), |resp| {
|
||||
if self.send_wildcard {
|
||||
resp.header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*");
|
||||
@@ -536,7 +543,7 @@ impl CorsBuilder {
|
||||
}
|
||||
|
||||
/// Set a list of headers which are safe to expose to the API of a CORS API specification.
|
||||
/// This corresponds to the `Access-Control-Expose-Headers` responde header.
|
||||
/// This corresponds to the `Access-Control-Expose-Headers` response header.
|
||||
///
|
||||
/// This is the `list of exposed headers` in the
|
||||
/// [Resource Processing Model](https://www.w3.org/TR/cors/#resource-processing-model).
|
||||
@@ -584,7 +591,6 @@ impl CorsBuilder {
|
||||
/// in an `Error::CredentialsWithWildcardOrigin` error during actix launch or runtime.
|
||||
///
|
||||
/// Defaults to `false`.
|
||||
#[cfg_attr(feature = "serialization", serde(default))]
|
||||
pub fn send_wildcard(&mut self) -> &mut CorsBuilder {
|
||||
if let Some(cors) = cors(&mut self.cors, &self.error) {
|
||||
cors.send_wildcard = true
|
||||
|
@@ -217,7 +217,7 @@ pub struct CookieSession {
|
||||
inner: Rc<CookieSessionInner>,
|
||||
}
|
||||
|
||||
/// Errors that can occure during handling cookie session
|
||||
/// Errors that can occur during handling cookie session
|
||||
#[derive(Fail, Debug)]
|
||||
pub enum CookieSessionError {
|
||||
/// Size of the serialized session is greater than 4000 bytes.
|
||||
|
@@ -6,7 +6,7 @@ use std::slice::Iter;
|
||||
use std::borrow::Cow;
|
||||
use smallvec::SmallVec;
|
||||
|
||||
use error::{ResponseError, UriSegmentError, ErrorBadRequest};
|
||||
use error::{ResponseError, UriSegmentError, InternalError, ErrorBadRequest};
|
||||
|
||||
|
||||
/// A trait to abstract the idea of creating a new instance of a type from a path parameter.
|
||||
@@ -77,7 +77,7 @@ impl<'a> Params<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Return iterator to items in paramter container
|
||||
/// Return iterator to items in parameter container
|
||||
pub fn iter(&self) -> Iter<(Cow<'a, str>, Cow<'a, str>)> {
|
||||
self.0.iter()
|
||||
}
|
||||
@@ -141,7 +141,7 @@ impl FromParam for PathBuf {
|
||||
macro_rules! FROM_STR {
|
||||
($type:ty) => {
|
||||
impl FromParam for $type {
|
||||
type Err = ErrorBadRequest<<$type as FromStr>::Err>;
|
||||
type Err = InternalError<<$type as FromStr>::Err>;
|
||||
|
||||
fn from_param(val: &str) -> Result<Self, Self::Err> {
|
||||
<$type as FromStr>::from_str(val).map_err(ErrorBadRequest)
|
||||
|
105
src/pipeline.rs
105
src/pipeline.rs
@@ -3,6 +3,7 @@ use std::rc::Rc;
|
||||
use std::cell::RefCell;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
use log::Level::Debug;
|
||||
use futures::{Async, Poll, Future, Stream};
|
||||
use futures::unsync::oneshot;
|
||||
|
||||
@@ -56,7 +57,7 @@ impl<S: 'static, H: PipelineHandler<S>> PipelineState<S, H> {
|
||||
|
||||
struct PipelineInfo<S> {
|
||||
req: HttpRequest<S>,
|
||||
count: usize,
|
||||
count: u16,
|
||||
mws: Rc<Vec<Box<Middleware<S>>>>,
|
||||
context: Option<Box<ActorHttpContext>>,
|
||||
error: Option<Error>,
|
||||
@@ -210,14 +211,14 @@ impl<S: 'static, H: PipelineHandler<S>> StartMiddlewares<S, H> {
|
||||
|
||||
fn init(info: &mut PipelineInfo<S>, handler: Rc<RefCell<H>>) -> PipelineState<S, H> {
|
||||
// execute middlewares, we need this stage because middlewares could be non-async
|
||||
// and we can move to next state immidietly
|
||||
let len = info.mws.len();
|
||||
// and we can move to next state immediately
|
||||
let len = info.mws.len() as u16;
|
||||
loop {
|
||||
if info.count == len {
|
||||
let reply = handler.borrow_mut().handle(info.req.clone());
|
||||
return WaitingResponse::init(info, reply)
|
||||
} else {
|
||||
match info.mws[info.count].start(&mut info.req) {
|
||||
match info.mws[info.count as usize].start(&mut info.req) {
|
||||
Ok(Started::Done) =>
|
||||
info.count += 1,
|
||||
Ok(Started::Response(resp)) =>
|
||||
@@ -246,7 +247,7 @@ impl<S: 'static, H: PipelineHandler<S>> StartMiddlewares<S, H> {
|
||||
}
|
||||
|
||||
fn poll(&mut self, info: &mut PipelineInfo<S>) -> Option<PipelineState<S, H>> {
|
||||
let len = info.mws.len();
|
||||
let len = info.mws.len() as u16;
|
||||
'outer: loop {
|
||||
match self.fut.as_mut().unwrap().poll() {
|
||||
Ok(Async::NotReady) => return None,
|
||||
@@ -260,7 +261,7 @@ impl<S: 'static, H: PipelineHandler<S>> StartMiddlewares<S, H> {
|
||||
return Some(WaitingResponse::init(info, reply));
|
||||
} else {
|
||||
loop {
|
||||
match info.mws[info.count].start(info.req_mut()) {
|
||||
match info.mws[info.count as usize].start(info.req_mut()) {
|
||||
Ok(Started::Done) =>
|
||||
info.count += 1,
|
||||
Ok(Started::Response(resp)) => {
|
||||
@@ -334,7 +335,7 @@ impl<S: 'static, H> RunMiddlewares<S, H> {
|
||||
loop {
|
||||
resp = match info.mws[curr].response(info.req_mut(), resp) {
|
||||
Err(err) => {
|
||||
info.count = curr + 1;
|
||||
info.count = (curr + 1) as u16;
|
||||
return ProcessResponse::init(err.into())
|
||||
}
|
||||
Ok(Response::Done(r)) => {
|
||||
@@ -439,8 +440,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
|
||||
ProcessResponse{ resp: resp,
|
||||
iostate: IOState::Response,
|
||||
running: RunningState::Running,
|
||||
drain: None,
|
||||
_s: PhantomData, _h: PhantomData})
|
||||
drain: None, _s: PhantomData, _h: PhantomData})
|
||||
}
|
||||
|
||||
fn poll_io(mut self, io: &mut Writer, info: &mut PipelineInfo<S>)
|
||||
@@ -448,7 +448,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
|
||||
{
|
||||
if self.drain.is_none() && self.running != RunningState::Paused {
|
||||
// if task is paused, write buffer is probably full
|
||||
loop {
|
||||
'outter: loop {
|
||||
let result = match mem::replace(&mut self.iostate, IOState::Done) {
|
||||
IOState::Response => {
|
||||
let result = match io.start(info.req_mut().get_inner(), &mut self.resp) {
|
||||
@@ -459,6 +459,13 @@ impl<S: 'static, H> ProcessResponse<S, H> {
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(err) = self.resp.error() {
|
||||
warn!("Error occured during request handling: {}", err);
|
||||
if log_enabled!(Debug) {
|
||||
debug!("{:?}", err);
|
||||
}
|
||||
}
|
||||
|
||||
match self.resp.replace_body(Body::Empty) {
|
||||
Body::Streaming(stream) =>
|
||||
self.iostate = IOState::Payload(stream),
|
||||
@@ -481,7 +488,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
|
||||
},
|
||||
Ok(Async::Ready(Some(chunk))) => {
|
||||
self.iostate = IOState::Payload(body);
|
||||
match io.write(chunk.as_ref()) {
|
||||
match io.write(chunk.into()) {
|
||||
Err(err) => {
|
||||
info.error = Some(err.into());
|
||||
return Ok(FinishingMiddlewares::init(info, self.resp))
|
||||
@@ -504,35 +511,44 @@ impl<S: 'static, H> ProcessResponse<S, H> {
|
||||
ctx.disconnected();
|
||||
}
|
||||
match ctx.poll() {
|
||||
Ok(Async::Ready(Some(frame))) => {
|
||||
match frame {
|
||||
Frame::Chunk(None) => {
|
||||
info.context = Some(ctx);
|
||||
self.iostate = IOState::Done;
|
||||
if let Err(err) = io.write_eof() {
|
||||
info.error = Some(err.into());
|
||||
return Ok(
|
||||
FinishingMiddlewares::init(info, self.resp))
|
||||
}
|
||||
break
|
||||
},
|
||||
Frame::Chunk(Some(chunk)) => {
|
||||
self.iostate = IOState::Actor(ctx);
|
||||
match io.write(chunk.as_ref()) {
|
||||
Err(err) => {
|
||||
Ok(Async::Ready(Some(vec))) => {
|
||||
if vec.is_empty() {
|
||||
self.iostate = IOState::Actor(ctx);
|
||||
break
|
||||
}
|
||||
let mut res = None;
|
||||
for frame in vec {
|
||||
match frame {
|
||||
Frame::Chunk(None) => {
|
||||
info.context = Some(ctx);
|
||||
self.iostate = IOState::Done;
|
||||
if let Err(err) = io.write_eof() {
|
||||
info.error = Some(err.into());
|
||||
return Ok(
|
||||
FinishingMiddlewares::init(info, self.resp))
|
||||
},
|
||||
Ok(result) => result
|
||||
}
|
||||
},
|
||||
Frame::Drain(fut) => {
|
||||
self.drain = Some(fut);
|
||||
self.iostate = IOState::Actor(ctx);
|
||||
break
|
||||
}
|
||||
break 'outter
|
||||
},
|
||||
Frame::Chunk(Some(chunk)) => {
|
||||
match io.write(chunk) {
|
||||
Err(err) => {
|
||||
info.error = Some(err.into());
|
||||
return Ok(
|
||||
FinishingMiddlewares::init(info, self.resp))
|
||||
},
|
||||
Ok(result) => res = Some(result),
|
||||
}
|
||||
},
|
||||
Frame::Drain(fut) =>
|
||||
self.drain = Some(fut),
|
||||
}
|
||||
}
|
||||
self.iostate = IOState::Actor(ctx);
|
||||
if self.drain.is_some() {
|
||||
self.running.resume();
|
||||
break 'outter
|
||||
}
|
||||
res.unwrap()
|
||||
},
|
||||
Ok(Async::Ready(None)) => {
|
||||
self.iostate = IOState::Done;
|
||||
@@ -567,16 +583,6 @@ impl<S: 'static, H> ProcessResponse<S, H> {
|
||||
if self.running == RunningState::Paused || self.drain.is_some() {
|
||||
match io.poll_completed(false) {
|
||||
Ok(Async::Ready(_)) => {
|
||||
match io.flush() {
|
||||
Ok(Async::Ready(_)) => (),
|
||||
Ok(Async::NotReady) => return Err(PipelineState::Response(self)),
|
||||
Err(err) => {
|
||||
debug!("Error sending data: {}", err);
|
||||
info.error = Some(err.into());
|
||||
return Ok(FinishingMiddlewares::init(info, self.resp))
|
||||
}
|
||||
}
|
||||
|
||||
self.running.resume();
|
||||
|
||||
// resolve drain futures
|
||||
@@ -588,7 +594,6 @@ impl<S: 'static, H> ProcessResponse<S, H> {
|
||||
},
|
||||
Ok(Async::NotReady) => return Err(PipelineState::Response(self)),
|
||||
Err(err) => {
|
||||
debug!("Error sending data: {}", err);
|
||||
info.error = Some(err.into());
|
||||
return Ok(FinishingMiddlewares::init(info, self.resp))
|
||||
}
|
||||
@@ -601,7 +606,6 @@ impl<S: 'static, H> ProcessResponse<S, H> {
|
||||
match io.write_eof() {
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
debug!("Error sending data: {}", err);
|
||||
info.error = Some(err.into());
|
||||
return Ok(FinishingMiddlewares::init(info, self.resp))
|
||||
}
|
||||
@@ -663,7 +667,7 @@ impl<S: 'static, H> FinishingMiddlewares<S, H> {
|
||||
self.fut = None;
|
||||
info.count -= 1;
|
||||
|
||||
match info.mws[info.count].finish(info.req_mut(), &self.resp) {
|
||||
match info.mws[info.count as usize].finish(info.req_mut(), &self.resp) {
|
||||
Finished::Done => {
|
||||
if info.count == 0 {
|
||||
return Some(Completed::init(info))
|
||||
@@ -677,12 +681,17 @@ impl<S: 'static, H> FinishingMiddlewares<S, H> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Completed<S, H>(PhantomData<S>, PhantomData<H>);
|
||||
|
||||
impl<S, H> Completed<S, H> {
|
||||
|
||||
#[inline]
|
||||
fn init(info: &mut PipelineInfo<S>) -> PipelineState<S, H> {
|
||||
if let Some(ref err) = info.error {
|
||||
error!("Error occured during request handling: {}", err);
|
||||
}
|
||||
|
||||
if info.context.is_none() {
|
||||
PipelineState::None
|
||||
} else {
|
||||
|
@@ -19,7 +19,7 @@ use httpresponse::HttpResponse;
|
||||
/// Route uses builder-like pattern for configuration.
|
||||
/// During request handling, resource object iterate through all routes
|
||||
/// and check all predicates for specific route, if request matches all predicates route
|
||||
/// route considired matched and route handler get called.
|
||||
/// route considered matched and route handler get called.
|
||||
///
|
||||
/// ```rust
|
||||
/// # extern crate actix_web;
|
||||
|
@@ -3,7 +3,7 @@ use std::io::{Read, Write};
|
||||
use std::fmt::Write as FmtWrite;
|
||||
use std::str::FromStr;
|
||||
|
||||
use http::Version;
|
||||
use http::{Version, Method, HttpTryFrom};
|
||||
use http::header::{HeaderMap, HeaderValue,
|
||||
ACCEPT_ENCODING, CONNECTION,
|
||||
CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING};
|
||||
@@ -16,11 +16,13 @@ use bytes::{Bytes, BytesMut, BufMut, Writer};
|
||||
use headers::ContentEncoding;
|
||||
use body::{Body, Binary};
|
||||
use error::PayloadError;
|
||||
use helpers::SharedBytes;
|
||||
use httprequest::HttpMessage;
|
||||
use httpresponse::HttpResponse;
|
||||
use payload::{PayloadSender, PayloadWriter};
|
||||
|
||||
use super::shared::SharedBytes;
|
||||
|
||||
|
||||
impl ContentEncoding {
|
||||
|
||||
#[inline]
|
||||
@@ -344,15 +346,17 @@ impl PayloadEncoder {
|
||||
pub fn new(buf: SharedBytes, req: &HttpMessage, resp: &mut HttpResponse) -> PayloadEncoder {
|
||||
let version = resp.version().unwrap_or_else(|| req.version);
|
||||
let mut body = resp.replace_body(Body::Empty);
|
||||
let response_encoding = resp.content_encoding();
|
||||
let has_body = match body {
|
||||
Body::Empty => false,
|
||||
Body::Binary(ref bin) => bin.len() >= 512,
|
||||
Body::Binary(ref bin) =>
|
||||
!(response_encoding == ContentEncoding::Auto && bin.len() < 96),
|
||||
_ => true,
|
||||
};
|
||||
|
||||
// Enable content encoding only if response does not contain Content-Encoding header
|
||||
let mut encoding = if has_body {
|
||||
let encoding = match *resp.content_encoding() {
|
||||
let encoding = match response_encoding {
|
||||
ContentEncoding::Auto => {
|
||||
// negotiate content-encoding
|
||||
if let Some(val) = req.headers.get(ACCEPT_ENCODING) {
|
||||
@@ -376,13 +380,12 @@ impl PayloadEncoder {
|
||||
ContentEncoding::Identity
|
||||
};
|
||||
|
||||
let transfer = match body {
|
||||
let mut transfer = match body {
|
||||
Body::Empty => {
|
||||
if resp.chunked() {
|
||||
error!("Chunked transfer is enabled but body is set to Empty");
|
||||
if req.method != Method::HEAD {
|
||||
resp.headers_mut().remove(CONTENT_LENGTH);
|
||||
}
|
||||
resp.headers_mut().remove(CONTENT_LENGTH);
|
||||
TransferEncoding::eof(buf)
|
||||
TransferEncoding::length(0, buf)
|
||||
},
|
||||
Body::Binary(ref mut bytes) => {
|
||||
if encoding.is_compression() {
|
||||
@@ -399,13 +402,20 @@ impl PayloadEncoder {
|
||||
ContentEncoding::Auto => unreachable!()
|
||||
};
|
||||
// TODO return error!
|
||||
let _ = enc.write(bytes.as_ref());
|
||||
let _ = enc.write(bytes.clone());
|
||||
let _ = enc.write_eof();
|
||||
|
||||
*bytes = Binary::from(tmp.get_mut().take());
|
||||
*bytes = Binary::from(tmp.take());
|
||||
encoding = ContentEncoding::Identity;
|
||||
}
|
||||
resp.headers_mut().remove(CONTENT_LENGTH);
|
||||
if req.method == Method::HEAD {
|
||||
let mut b = BytesMut::new();
|
||||
let _ = write!(b, "{}", bytes.len());
|
||||
resp.headers_mut().insert(
|
||||
CONTENT_LENGTH, HeaderValue::try_from(b.freeze()).unwrap());
|
||||
} else {
|
||||
resp.headers_mut().remove(CONTENT_LENGTH);
|
||||
}
|
||||
TransferEncoding::eof(buf)
|
||||
}
|
||||
Body::Streaming(_) | Body::Actor(_) => {
|
||||
@@ -426,7 +436,12 @@ impl PayloadEncoder {
|
||||
}
|
||||
}
|
||||
};
|
||||
resp.replace_body(body);
|
||||
//
|
||||
if req.method == Method::HEAD {
|
||||
transfer.kind = TransferEncodingKind::Length(0);
|
||||
} else {
|
||||
resp.replace_body(body);
|
||||
}
|
||||
|
||||
PayloadEncoder(
|
||||
match encoding {
|
||||
@@ -444,44 +459,8 @@ impl PayloadEncoder {
|
||||
|
||||
fn streaming_encoding(buf: SharedBytes, version: Version,
|
||||
resp: &mut HttpResponse) -> TransferEncoding {
|
||||
if resp.chunked() {
|
||||
// Enable transfer encoding
|
||||
resp.headers_mut().remove(CONTENT_LENGTH);
|
||||
if version == Version::HTTP_2 {
|
||||
resp.headers_mut().remove(TRANSFER_ENCODING);
|
||||
TransferEncoding::eof(buf)
|
||||
} else {
|
||||
resp.headers_mut().insert(
|
||||
TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
|
||||
TransferEncoding::chunked(buf)
|
||||
}
|
||||
} else {
|
||||
// if Content-Length is specified, then use it as length hint
|
||||
let (len, chunked) =
|
||||
if let Some(len) = resp.headers().get(CONTENT_LENGTH) {
|
||||
// Content-Length
|
||||
if let Ok(s) = len.to_str() {
|
||||
if let Ok(len) = s.parse::<u64>() {
|
||||
(Some(len), false)
|
||||
} else {
|
||||
error!("illegal Content-Length: {:?}", len);
|
||||
(None, false)
|
||||
}
|
||||
} else {
|
||||
error!("illegal Content-Length: {:?}", len);
|
||||
(None, false)
|
||||
}
|
||||
} else {
|
||||
(None, true)
|
||||
};
|
||||
|
||||
if !chunked {
|
||||
if let Some(len) = len {
|
||||
TransferEncoding::length(len, buf)
|
||||
} else {
|
||||
TransferEncoding::eof(buf)
|
||||
}
|
||||
} else {
|
||||
match resp.chunked() {
|
||||
Some(true) => {
|
||||
// Enable transfer encoding
|
||||
resp.headers_mut().remove(CONTENT_LENGTH);
|
||||
if version == Version::HTTP_2 {
|
||||
@@ -492,6 +471,49 @@ impl PayloadEncoder {
|
||||
TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
|
||||
TransferEncoding::chunked(buf)
|
||||
}
|
||||
},
|
||||
Some(false) =>
|
||||
TransferEncoding::eof(buf),
|
||||
None => {
|
||||
// if Content-Length is specified, then use it as length hint
|
||||
let (len, chunked) =
|
||||
if let Some(len) = resp.headers().get(CONTENT_LENGTH) {
|
||||
// Content-Length
|
||||
if let Ok(s) = len.to_str() {
|
||||
if let Ok(len) = s.parse::<u64>() {
|
||||
(Some(len), false)
|
||||
} else {
|
||||
error!("illegal Content-Length: {:?}", len);
|
||||
(None, false)
|
||||
}
|
||||
} else {
|
||||
error!("illegal Content-Length: {:?}", len);
|
||||
(None, false)
|
||||
}
|
||||
} else {
|
||||
(None, true)
|
||||
};
|
||||
|
||||
if !chunked {
|
||||
if let Some(len) = len {
|
||||
TransferEncoding::length(len, buf)
|
||||
} else {
|
||||
TransferEncoding::eof(buf)
|
||||
}
|
||||
} else {
|
||||
// Enable transfer encoding
|
||||
match version {
|
||||
Version::HTTP_11 => {
|
||||
resp.headers_mut().insert(
|
||||
TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
|
||||
TransferEncoding::chunked(buf)
|
||||
},
|
||||
_ => {
|
||||
resp.headers_mut().remove(TRANSFER_ENCODING);
|
||||
TransferEncoding::eof(buf)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -499,16 +521,6 @@ impl PayloadEncoder {
|
||||
|
||||
impl PayloadEncoder {
|
||||
|
||||
#[inline]
|
||||
pub fn len(&self) -> usize {
|
||||
self.0.get_ref().len()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn get_mut(&mut self) -> &mut BytesMut {
|
||||
self.0.get_mut()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn is_eof(&self) -> bool {
|
||||
self.0.is_eof()
|
||||
@@ -516,7 +528,7 @@ impl PayloadEncoder {
|
||||
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(inline_always))]
|
||||
#[inline(always)]
|
||||
pub fn write(&mut self, payload: &[u8]) -> Result<(), io::Error> {
|
||||
pub fn write(&mut self, payload: Binary) -> Result<(), io::Error> {
|
||||
self.0.write(payload)
|
||||
}
|
||||
|
||||
@@ -539,42 +551,10 @@ impl ContentEncoder {
|
||||
#[inline]
|
||||
pub fn is_eof(&self) -> bool {
|
||||
match *self {
|
||||
ContentEncoder::Br(ref encoder) =>
|
||||
encoder.get_ref().is_eof(),
|
||||
ContentEncoder::Deflate(ref encoder) =>
|
||||
encoder.get_ref().is_eof(),
|
||||
ContentEncoder::Gzip(ref encoder) =>
|
||||
encoder.get_ref().is_eof(),
|
||||
ContentEncoder::Identity(ref encoder) =>
|
||||
encoder.is_eof(),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn get_ref(&self) -> &BytesMut {
|
||||
match *self {
|
||||
ContentEncoder::Br(ref encoder) =>
|
||||
encoder.get_ref().buffer.get_ref(),
|
||||
ContentEncoder::Deflate(ref encoder) =>
|
||||
encoder.get_ref().buffer.get_ref(),
|
||||
ContentEncoder::Gzip(ref encoder) =>
|
||||
encoder.get_ref().buffer.get_ref(),
|
||||
ContentEncoder::Identity(ref encoder) =>
|
||||
encoder.buffer.get_ref(),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn get_mut(&mut self) -> &mut BytesMut {
|
||||
match *self {
|
||||
ContentEncoder::Br(ref mut encoder) =>
|
||||
encoder.get_mut().buffer.get_mut(),
|
||||
ContentEncoder::Deflate(ref mut encoder) =>
|
||||
encoder.get_mut().buffer.get_mut(),
|
||||
ContentEncoder::Gzip(ref mut encoder) =>
|
||||
encoder.get_mut().buffer.get_mut(),
|
||||
ContentEncoder::Identity(ref mut encoder) =>
|
||||
encoder.buffer.get_mut(),
|
||||
ContentEncoder::Br(ref encoder) => encoder.get_ref().is_eof(),
|
||||
ContentEncoder::Deflate(ref encoder) => encoder.get_ref().is_eof(),
|
||||
ContentEncoder::Gzip(ref encoder) => encoder.get_ref().is_eof(),
|
||||
ContentEncoder::Identity(ref encoder) => encoder.is_eof(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -625,10 +605,10 @@ impl ContentEncoder {
|
||||
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(inline_always))]
|
||||
#[inline(always)]
|
||||
pub fn write(&mut self, data: &[u8]) -> Result<(), io::Error> {
|
||||
pub fn write(&mut self, data: Binary) -> Result<(), io::Error> {
|
||||
match *self {
|
||||
ContentEncoder::Br(ref mut encoder) => {
|
||||
match encoder.write(data) {
|
||||
match encoder.write(data.as_ref()) {
|
||||
Ok(_) =>
|
||||
encoder.flush(),
|
||||
Err(err) => {
|
||||
@@ -638,7 +618,7 @@ impl ContentEncoder {
|
||||
}
|
||||
},
|
||||
ContentEncoder::Gzip(ref mut encoder) => {
|
||||
match encoder.write(data) {
|
||||
match encoder.write(data.as_ref()) {
|
||||
Ok(_) =>
|
||||
encoder.flush(),
|
||||
Err(err) => {
|
||||
@@ -648,7 +628,7 @@ impl ContentEncoder {
|
||||
}
|
||||
}
|
||||
ContentEncoder::Deflate(ref mut encoder) => {
|
||||
match encoder.write(data) {
|
||||
match encoder.write(data.as_ref()) {
|
||||
Ok(_) =>
|
||||
encoder.flush(),
|
||||
Err(err) => {
|
||||
@@ -682,7 +662,7 @@ enum TransferEncodingKind {
|
||||
Length(u64),
|
||||
/// An Encoder for when Content-Length is not known.
|
||||
///
|
||||
/// Appliction decides when to stop writing.
|
||||
/// Application decides when to stop writing.
|
||||
Eof,
|
||||
}
|
||||
|
||||
@@ -723,11 +703,12 @@ impl TransferEncoding {
|
||||
|
||||
/// Encode message. Return `EOF` state of encoder
|
||||
#[inline]
|
||||
pub fn encode(&mut self, msg: &[u8]) -> io::Result<bool> {
|
||||
pub fn encode(&mut self, mut msg: Binary) -> io::Result<bool> {
|
||||
match self.kind {
|
||||
TransferEncodingKind::Eof => {
|
||||
self.buffer.get_mut().extend_from_slice(msg);
|
||||
Ok(msg.is_empty())
|
||||
let eof = msg.is_empty();
|
||||
self.buffer.extend(msg);
|
||||
Ok(eof)
|
||||
},
|
||||
TransferEncodingKind::Chunked(ref mut eof) => {
|
||||
if *eof {
|
||||
@@ -736,24 +717,31 @@ impl TransferEncoding {
|
||||
|
||||
if msg.is_empty() {
|
||||
*eof = true;
|
||||
self.buffer.get_mut().extend_from_slice(b"0\r\n\r\n");
|
||||
self.buffer.extend_from_slice(b"0\r\n\r\n");
|
||||
} else {
|
||||
write!(self.buffer.get_mut(), "{:X}\r\n", msg.len())
|
||||
let mut buf = BytesMut::new();
|
||||
write!(&mut buf, "{:X}\r\n", msg.len())
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
|
||||
self.buffer.get_mut().extend_from_slice(msg);
|
||||
self.buffer.get_mut().extend_from_slice(b"\r\n");
|
||||
self.buffer.reserve(buf.len() + msg.len() + 2);
|
||||
self.buffer.extend(buf.into());
|
||||
self.buffer.extend(msg);
|
||||
self.buffer.extend_from_slice(b"\r\n");
|
||||
}
|
||||
Ok(*eof)
|
||||
},
|
||||
TransferEncodingKind::Length(ref mut remaining) => {
|
||||
if msg.is_empty() {
|
||||
return Ok(*remaining == 0)
|
||||
}
|
||||
let max = cmp::min(*remaining, msg.len() as u64);
|
||||
self.buffer.get_mut().extend_from_slice(msg[..max as usize].as_ref());
|
||||
if *remaining > 0 {
|
||||
if msg.is_empty() {
|
||||
return Ok(*remaining == 0)
|
||||
}
|
||||
let len = cmp::min(*remaining, msg.len() as u64);
|
||||
self.buffer.extend(msg.take().split_to(len as usize).into());
|
||||
|
||||
*remaining -= max as u64;
|
||||
Ok(*remaining == 0)
|
||||
*remaining -= len as u64;
|
||||
Ok(*remaining == 0)
|
||||
} else {
|
||||
Ok(true)
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -766,7 +754,7 @@ impl TransferEncoding {
|
||||
TransferEncodingKind::Chunked(ref mut eof) => {
|
||||
if !*eof {
|
||||
*eof = true;
|
||||
self.buffer.get_mut().extend_from_slice(b"0\r\n\r\n");
|
||||
self.buffer.extend_from_slice(b"0\r\n\r\n");
|
||||
}
|
||||
},
|
||||
}
|
||||
@@ -777,7 +765,7 @@ impl io::Write for TransferEncoding {
|
||||
|
||||
#[inline]
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.encode(buf)?;
|
||||
self.encode(Binary::from_slice(buf))?;
|
||||
Ok(buf.len())
|
||||
}
|
||||
|
||||
@@ -863,8 +851,8 @@ mod tests {
|
||||
fn test_chunked_te() {
|
||||
let bytes = SharedBytes::default();
|
||||
let mut enc = TransferEncoding::chunked(bytes.clone());
|
||||
assert!(!enc.encode(b"test").ok().unwrap());
|
||||
assert!(enc.encode(b"").ok().unwrap());
|
||||
assert!(!enc.encode(Binary::from(b"test".as_ref())).ok().unwrap());
|
||||
assert!(enc.encode(Binary::from(b"".as_ref())).ok().unwrap());
|
||||
assert_eq!(bytes.get_mut().take().freeze(),
|
||||
Bytes::from_static(b"4\r\ntest\r\n0\r\n\r\n"));
|
||||
}
|
||||
|
@@ -96,12 +96,12 @@ impl<T, H> Http1<T, H>
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: refacrtor
|
||||
// TODO: refactor
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(cyclomatic_complexity))]
|
||||
pub fn poll(&mut self) -> Poll<(), ()> {
|
||||
// keep-alive timer
|
||||
if self.keepalive_timer.is_some() {
|
||||
match self.keepalive_timer.as_mut().unwrap().poll() {
|
||||
if let Some(ref mut timer) = self.keepalive_timer {
|
||||
match timer.poll() {
|
||||
Ok(Async::Ready(_)) => {
|
||||
trace!("Keep-alive timeout, close connection");
|
||||
return Ok(Async::Ready(()))
|
||||
@@ -133,7 +133,7 @@ impl<T, H> Http1<T, H>
|
||||
Ok(Async::Ready(ready)) => {
|
||||
not_ready = false;
|
||||
|
||||
// overide keep-alive state
|
||||
// override keep-alive state
|
||||
if self.stream.keepalive() {
|
||||
self.flags.insert(Flags::KEEPALIVE);
|
||||
} else {
|
||||
@@ -146,10 +146,8 @@ impl<T, H> Http1<T, H>
|
||||
item.flags.insert(EntryFlags::FINISHED);
|
||||
}
|
||||
},
|
||||
Ok(Async::NotReady) => {
|
||||
// no more IO for this iteration
|
||||
io = true;
|
||||
},
|
||||
// no more IO for this iteration
|
||||
Ok(Async::NotReady) => io = true,
|
||||
Err(err) => {
|
||||
// it is not possible to recover from error
|
||||
// during pipe handling, so just drop connection
|
||||
@@ -227,38 +225,7 @@ impl<T, H> Http1<T, H>
|
||||
self.tasks.push_back(
|
||||
Entry {pipe: pipe.unwrap_or_else(|| Pipeline::error(HTTPNotFound)),
|
||||
flags: EntryFlags::empty()});
|
||||
}
|
||||
Err(ReaderError::Disconnect) => {
|
||||
not_ready = false;
|
||||
self.flags.insert(Flags::ERROR);
|
||||
self.stream.disconnected();
|
||||
for entry in &mut self.tasks {
|
||||
entry.pipe.disconnected()
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
// notify all tasks
|
||||
not_ready = false;
|
||||
self.stream.disconnected();
|
||||
for entry in &mut self.tasks {
|
||||
entry.pipe.disconnected()
|
||||
}
|
||||
|
||||
// kill keepalive
|
||||
self.flags.remove(Flags::KEEPALIVE);
|
||||
self.keepalive_timer.take();
|
||||
|
||||
// on parse error, stop reading stream but tasks need to be completed
|
||||
self.flags.insert(Flags::ERROR);
|
||||
|
||||
if self.tasks.is_empty() {
|
||||
if let ReaderError::Error(err) = err {
|
||||
self.tasks.push_back(
|
||||
Entry {pipe: Pipeline::error(err.error_response()),
|
||||
flags: EntryFlags::empty()});
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Async::NotReady) => {
|
||||
// start keep-alive timer, this also is slow request timeout
|
||||
if self.tasks.is_empty() {
|
||||
@@ -293,7 +260,38 @@ impl<T, H> Http1<T, H>
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
},
|
||||
Err(ReaderError::Disconnect) => {
|
||||
not_ready = false;
|
||||
self.flags.insert(Flags::ERROR);
|
||||
self.stream.disconnected();
|
||||
for entry in &mut self.tasks {
|
||||
entry.pipe.disconnected()
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
// notify all tasks
|
||||
not_ready = false;
|
||||
self.stream.disconnected();
|
||||
for entry in &mut self.tasks {
|
||||
entry.pipe.disconnected()
|
||||
}
|
||||
|
||||
// kill keepalive
|
||||
self.flags.remove(Flags::KEEPALIVE);
|
||||
self.keepalive_timer.take();
|
||||
|
||||
// on parse error, stop reading stream but tasks need to be completed
|
||||
self.flags.insert(Flags::ERROR);
|
||||
|
||||
if self.tasks.is_empty() {
|
||||
if let ReaderError::Error(err) = err {
|
||||
self.tasks.push_back(
|
||||
Entry {pipe: Pipeline::error(err.error_response()),
|
||||
flags: EntryFlags::empty()});
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1204,6 +1202,7 @@ mod tests {
|
||||
panic!("Error");
|
||||
}
|
||||
|
||||
// type in chunked
|
||||
let mut buf = Buffer::new(
|
||||
"GET /test HTTP/1.1\r\n\
|
||||
transfer-encoding: chnked\r\n\r\n");
|
||||
|
@@ -2,15 +2,15 @@ use std::io;
|
||||
use bytes::BufMut;
|
||||
use futures::{Async, Poll};
|
||||
use tokio_io::AsyncWrite;
|
||||
use http::Version;
|
||||
use http::{Method, Version};
|
||||
use http::header::{HeaderValue, CONNECTION, DATE};
|
||||
|
||||
use helpers;
|
||||
use body::Body;
|
||||
use helpers::SharedBytes;
|
||||
use body::{Body, Binary};
|
||||
use httprequest::HttpMessage;
|
||||
use httpresponse::HttpResponse;
|
||||
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
|
||||
use super::shared::SharedBytes;
|
||||
use super::encoding::PayloadEncoder;
|
||||
|
||||
const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific
|
||||
@@ -56,23 +56,25 @@ impl<T: AsyncWrite> H1Writer<T> {
|
||||
}
|
||||
|
||||
pub fn disconnected(&mut self) {
|
||||
self.encoder.get_mut().take();
|
||||
self.buffer.take();
|
||||
}
|
||||
|
||||
pub fn keepalive(&self) -> bool {
|
||||
self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE)
|
||||
}
|
||||
|
||||
fn write_to_stream(&mut self) -> Result<WriterState, io::Error> {
|
||||
let buffer = self.encoder.get_mut();
|
||||
|
||||
while !buffer.is_empty() {
|
||||
match self.stream.write(buffer.as_ref()) {
|
||||
fn write_to_stream(&mut self) -> io::Result<WriterState> {
|
||||
while !self.buffer.is_empty() {
|
||||
match self.stream.write(self.buffer.as_ref()) {
|
||||
Ok(0) => {
|
||||
self.disconnected();
|
||||
return Ok(WriterState::Done);
|
||||
},
|
||||
Ok(n) => {
|
||||
let _ = buffer.split_to(n);
|
||||
let _ = self.buffer.split_to(n);
|
||||
},
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
if buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
return Ok(WriterState::Pause)
|
||||
} else {
|
||||
return Ok(WriterState::Done)
|
||||
@@ -92,23 +94,7 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
|
||||
self.written
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn flush(&mut self) -> Poll<(), io::Error> {
|
||||
match self.stream.flush() {
|
||||
Ok(_) => Ok(Async::Ready(())),
|
||||
Err(e) => {
|
||||
if e.kind() == io::ErrorKind::WouldBlock {
|
||||
Ok(Async::NotReady)
|
||||
} else {
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse)
|
||||
-> Result<WriterState, io::Error>
|
||||
{
|
||||
fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse) -> io::Result<WriterState> {
|
||||
// prepare task
|
||||
self.flags.insert(Flags::STARTED);
|
||||
self.encoder = PayloadEncoder::new(self.buffer.clone(), req, msg);
|
||||
@@ -133,7 +119,7 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
|
||||
|
||||
// render message
|
||||
{
|
||||
let mut buffer = self.encoder.get_mut();
|
||||
let mut buffer = self.buffer.get_mut();
|
||||
if let Body::Binary(ref bytes) = body {
|
||||
buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len());
|
||||
} else {
|
||||
@@ -146,7 +132,11 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
|
||||
|
||||
match body {
|
||||
Body::Empty =>
|
||||
buffer.extend_from_slice(b"\r\ncontent-length: 0\r\n"),
|
||||
if req.method != Method::HEAD {
|
||||
buffer.extend_from_slice(b"\r\ncontent-length: 0\r\n");
|
||||
} else {
|
||||
buffer.extend_from_slice(b"\r\n");
|
||||
},
|
||||
Body::Binary(ref bytes) =>
|
||||
helpers::write_content_length(bytes.len(), &mut buffer),
|
||||
_ =>
|
||||
@@ -176,14 +166,14 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
|
||||
|
||||
if let Body::Binary(bytes) = body {
|
||||
self.written = bytes.len() as u64;
|
||||
self.encoder.write(bytes.as_ref())?;
|
||||
self.encoder.write(bytes)?;
|
||||
} else {
|
||||
msg.replace_body(body);
|
||||
}
|
||||
Ok(WriterState::Done)
|
||||
}
|
||||
|
||||
fn write(&mut self, payload: &[u8]) -> Result<WriterState, io::Error> {
|
||||
fn write(&mut self, payload: Binary) -> io::Result<WriterState> {
|
||||
self.written += payload.len() as u64;
|
||||
if !self.flags.contains(Flags::DISCONNECTED) {
|
||||
if self.flags.contains(Flags::STARTED) {
|
||||
@@ -192,24 +182,24 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
|
||||
return Ok(WriterState::Done)
|
||||
} else {
|
||||
// might be response to EXCEPT
|
||||
self.encoder.get_mut().extend_from_slice(payload)
|
||||
self.buffer.extend_from_slice(payload.as_ref())
|
||||
}
|
||||
}
|
||||
|
||||
if self.encoder.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
Ok(WriterState::Pause)
|
||||
} else {
|
||||
Ok(WriterState::Done)
|
||||
}
|
||||
}
|
||||
|
||||
fn write_eof(&mut self) -> Result<WriterState, io::Error> {
|
||||
fn write_eof(&mut self) -> io::Result<WriterState> {
|
||||
self.encoder.write_eof()?;
|
||||
|
||||
if !self.encoder.is_eof() {
|
||||
Err(io::Error::new(io::ErrorKind::Other,
|
||||
"Last payload item, but eof is not reached"))
|
||||
} else if self.encoder.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
} else if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
Ok(WriterState::Pause)
|
||||
} else {
|
||||
Ok(WriterState::Done)
|
||||
|
@@ -7,11 +7,11 @@ use http::{Version, HttpTryFrom, Response};
|
||||
use http::header::{HeaderValue, CONNECTION, TRANSFER_ENCODING, DATE, CONTENT_LENGTH};
|
||||
|
||||
use helpers;
|
||||
use body::Body;
|
||||
use helpers::SharedBytes;
|
||||
use body::{Body, Binary};
|
||||
use httprequest::HttpMessage;
|
||||
use httpresponse::HttpResponse;
|
||||
use super::encoding::PayloadEncoder;
|
||||
use super::shared::SharedBytes;
|
||||
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
|
||||
|
||||
const CHUNK_SIZE: usize = 16_384;
|
||||
@@ -52,15 +52,13 @@ impl H2Writer {
|
||||
}
|
||||
}
|
||||
|
||||
fn write_to_stream(&mut self) -> Result<WriterState, io::Error> {
|
||||
fn write_to_stream(&mut self) -> io::Result<WriterState> {
|
||||
if !self.flags.contains(Flags::STARTED) {
|
||||
return Ok(WriterState::Done)
|
||||
}
|
||||
|
||||
if let Some(ref mut stream) = self.stream {
|
||||
let buffer = self.encoder.get_mut();
|
||||
|
||||
if buffer.is_empty() {
|
||||
if self.buffer.is_empty() {
|
||||
if self.flags.contains(Flags::EOF) {
|
||||
let _ = stream.send_data(Bytes::new(), true);
|
||||
}
|
||||
@@ -70,7 +68,7 @@ impl H2Writer {
|
||||
loop {
|
||||
match stream.poll_capacity() {
|
||||
Ok(Async::NotReady) => {
|
||||
if buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
return Ok(WriterState::Pause)
|
||||
} else {
|
||||
return Ok(WriterState::Done)
|
||||
@@ -80,15 +78,15 @@ impl H2Writer {
|
||||
return Ok(WriterState::Done)
|
||||
}
|
||||
Ok(Async::Ready(Some(cap))) => {
|
||||
let len = buffer.len();
|
||||
let bytes = buffer.split_to(cmp::min(cap, len));
|
||||
let eof = buffer.is_empty() && self.flags.contains(Flags::EOF);
|
||||
let len = self.buffer.len();
|
||||
let bytes = self.buffer.split_to(cmp::min(cap, len));
|
||||
let eof = self.buffer.is_empty() && self.flags.contains(Flags::EOF);
|
||||
self.written += bytes.len() as u64;
|
||||
|
||||
if let Err(err) = stream.send_data(bytes.freeze(), eof) {
|
||||
return Err(io::Error::new(io::ErrorKind::Other, err))
|
||||
} else if !buffer.is_empty() {
|
||||
let cap = cmp::min(buffer.len(), CHUNK_SIZE);
|
||||
} else if !self.buffer.is_empty() {
|
||||
let cap = cmp::min(self.buffer.len(), CHUNK_SIZE);
|
||||
stream.reserve_capacity(cap);
|
||||
} else {
|
||||
return Ok(WriterState::Pause)
|
||||
@@ -110,16 +108,7 @@ impl Writer for H2Writer {
|
||||
self.written
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn flush(&mut self) -> Poll<(), io::Error> {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
|
||||
fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse)
|
||||
-> Result<WriterState, io::Error>
|
||||
{
|
||||
// trace!("Prepare response with status: {:?}", msg.status());
|
||||
|
||||
fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse) -> io::Result<WriterState> {
|
||||
// prepare response
|
||||
self.flags.insert(Flags::STARTED);
|
||||
self.encoder = PayloadEncoder::new(self.buffer.clone(), req, msg);
|
||||
@@ -172,9 +161,9 @@ impl Writer for H2Writer {
|
||||
if let Body::Binary(bytes) = body {
|
||||
self.flags.insert(Flags::EOF);
|
||||
self.written = bytes.len() as u64;
|
||||
self.encoder.write(bytes.as_ref())?;
|
||||
self.encoder.write(bytes)?;
|
||||
if let Some(ref mut stream) = self.stream {
|
||||
stream.reserve_capacity(cmp::min(self.encoder.len(), CHUNK_SIZE));
|
||||
stream.reserve_capacity(cmp::min(self.buffer.len(), CHUNK_SIZE));
|
||||
}
|
||||
Ok(WriterState::Pause)
|
||||
} else {
|
||||
@@ -183,7 +172,7 @@ impl Writer for H2Writer {
|
||||
}
|
||||
}
|
||||
|
||||
fn write(&mut self, payload: &[u8]) -> Result<WriterState, io::Error> {
|
||||
fn write(&mut self, payload: Binary) -> io::Result<WriterState> {
|
||||
self.written = payload.len() as u64;
|
||||
|
||||
if !self.flags.contains(Flags::DISCONNECTED) {
|
||||
@@ -192,25 +181,25 @@ impl Writer for H2Writer {
|
||||
self.encoder.write(payload)?;
|
||||
} else {
|
||||
// might be response for EXCEPT
|
||||
self.encoder.get_mut().extend_from_slice(payload)
|
||||
self.buffer.extend_from_slice(payload.as_ref())
|
||||
}
|
||||
}
|
||||
|
||||
if self.encoder.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
Ok(WriterState::Pause)
|
||||
} else {
|
||||
Ok(WriterState::Done)
|
||||
}
|
||||
}
|
||||
|
||||
fn write_eof(&mut self) -> Result<WriterState, io::Error> {
|
||||
fn write_eof(&mut self) -> io::Result<WriterState> {
|
||||
self.encoder.write_eof()?;
|
||||
|
||||
self.flags.insert(Flags::EOF);
|
||||
if !self.encoder.is_eof() {
|
||||
Err(io::Error::new(io::ErrorKind::Other,
|
||||
"Last payload item, but eof is not reached"))
|
||||
} else if self.encoder.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
} else if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
Ok(WriterState::Pause)
|
||||
} else {
|
||||
Ok(WriterState::Done)
|
||||
|
@@ -15,11 +15,13 @@ mod h2;
|
||||
mod h1writer;
|
||||
mod h2writer;
|
||||
mod settings;
|
||||
mod shared;
|
||||
mod utils;
|
||||
|
||||
pub use self::srv::HttpServer;
|
||||
pub use self::settings::ServerSettings;
|
||||
|
||||
use body::Binary;
|
||||
use error::Error;
|
||||
use httprequest::{HttpMessage, HttpRequest};
|
||||
use httpresponse::HttpResponse;
|
||||
@@ -54,6 +56,12 @@ pub trait HttpHandler: 'static {
|
||||
fn handle(&mut self, req: HttpRequest) -> Result<Box<HttpHandlerTask>, HttpRequest>;
|
||||
}
|
||||
|
||||
impl HttpHandler for Box<HttpHandler> {
|
||||
fn handle(&mut self, req: HttpRequest) -> Result<Box<HttpHandlerTask>, HttpRequest> {
|
||||
self.as_mut().handle(req)
|
||||
}
|
||||
}
|
||||
|
||||
pub trait HttpHandlerTask {
|
||||
|
||||
fn poll(&mut self) -> Poll<(), Error>;
|
||||
@@ -90,14 +98,11 @@ pub enum WriterState {
|
||||
pub trait Writer {
|
||||
fn written(&self) -> u64;
|
||||
|
||||
fn start(&mut self, req: &mut HttpMessage, resp: &mut HttpResponse)
|
||||
-> Result<WriterState, io::Error>;
|
||||
fn start(&mut self, req: &mut HttpMessage, resp: &mut HttpResponse) -> io::Result<WriterState>;
|
||||
|
||||
fn write(&mut self, payload: &[u8]) -> Result<WriterState, io::Error>;
|
||||
fn write(&mut self, payload: Binary) -> io::Result<WriterState>;
|
||||
|
||||
fn write_eof(&mut self) -> Result<WriterState, io::Error>;
|
||||
|
||||
fn flush(&mut self) -> Poll<(), io::Error>;
|
||||
fn write_eof(&mut self) -> io::Result<WriterState>;
|
||||
|
||||
fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error>;
|
||||
}
|
||||
|
@@ -4,6 +4,7 @@ use std::cell::{Cell, RefCell, RefMut};
|
||||
|
||||
use helpers;
|
||||
use super::channel::Node;
|
||||
use super::shared::{SharedBytes, SharedBytesPool};
|
||||
|
||||
/// Various server settings
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -63,7 +64,7 @@ pub(crate) struct WorkerSettings<H> {
|
||||
h: RefCell<Vec<H>>,
|
||||
enabled: bool,
|
||||
keep_alive: u64,
|
||||
bytes: Rc<helpers::SharedBytesPool>,
|
||||
bytes: Rc<SharedBytesPool>,
|
||||
messages: Rc<helpers::SharedMessagePool>,
|
||||
channels: Cell<usize>,
|
||||
node: Node<()>,
|
||||
@@ -75,7 +76,7 @@ impl<H> WorkerSettings<H> {
|
||||
h: RefCell::new(h),
|
||||
enabled: if let Some(ka) = keep_alive { ka > 0 } else { false },
|
||||
keep_alive: keep_alive.unwrap_or(0),
|
||||
bytes: Rc::new(helpers::SharedBytesPool::new()),
|
||||
bytes: Rc::new(SharedBytesPool::new()),
|
||||
messages: Rc::new(helpers::SharedMessagePool::new()),
|
||||
channels: Cell::new(0),
|
||||
node: Node::head(),
|
||||
@@ -102,8 +103,8 @@ impl<H> WorkerSettings<H> {
|
||||
self.enabled
|
||||
}
|
||||
|
||||
pub fn get_shared_bytes(&self) -> helpers::SharedBytes {
|
||||
helpers::SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes))
|
||||
pub fn get_shared_bytes(&self) -> SharedBytes {
|
||||
SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes))
|
||||
}
|
||||
|
||||
pub fn get_http_message(&self) -> helpers::SharedHttpMessage {
|
||||
|
120
src/server/shared.rs
Normal file
120
src/server/shared.rs
Normal file
@@ -0,0 +1,120 @@
|
||||
use std::mem;
|
||||
use std::cell::RefCell;
|
||||
use std::rc::Rc;
|
||||
use std::collections::VecDeque;
|
||||
use bytes::BytesMut;
|
||||
|
||||
use body::Binary;
|
||||
|
||||
|
||||
/// Internal use only! unsafe
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct SharedBytesPool(RefCell<VecDeque<Rc<BytesMut>>>);
|
||||
|
||||
impl SharedBytesPool {
|
||||
pub fn new() -> SharedBytesPool {
|
||||
SharedBytesPool(RefCell::new(VecDeque::with_capacity(128)))
|
||||
}
|
||||
|
||||
pub fn get_bytes(&self) -> Rc<BytesMut> {
|
||||
if let Some(bytes) = self.0.borrow_mut().pop_front() {
|
||||
bytes
|
||||
} else {
|
||||
Rc::new(BytesMut::new())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn release_bytes(&self, mut bytes: Rc<BytesMut>) {
|
||||
let v = &mut self.0.borrow_mut();
|
||||
if v.len() < 128 {
|
||||
Rc::get_mut(&mut bytes).unwrap().take();
|
||||
v.push_front(bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct SharedBytes(
|
||||
Option<Rc<BytesMut>>, Option<Rc<SharedBytesPool>>);
|
||||
|
||||
impl Drop for SharedBytes {
|
||||
fn drop(&mut self) {
|
||||
if let Some(ref pool) = self.1 {
|
||||
if let Some(bytes) = self.0.take() {
|
||||
if Rc::strong_count(&bytes) == 1 {
|
||||
pool.release_bytes(bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SharedBytes {
|
||||
|
||||
pub fn empty() -> Self {
|
||||
SharedBytes(None, None)
|
||||
}
|
||||
|
||||
pub fn new(bytes: Rc<BytesMut>, pool: Rc<SharedBytesPool>) -> SharedBytes {
|
||||
SharedBytes(Some(bytes), Some(pool))
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
#[allow(mutable_transmutes)]
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))]
|
||||
pub fn get_mut(&self) -> &mut BytesMut {
|
||||
let r: &BytesMut = self.0.as_ref().unwrap().as_ref();
|
||||
unsafe{mem::transmute(r)}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn len(&self) -> usize {
|
||||
self.0.as_ref().unwrap().len()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.0.as_ref().unwrap().is_empty()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn as_ref(&self) -> &[u8] {
|
||||
self.0.as_ref().unwrap().as_ref()
|
||||
}
|
||||
|
||||
pub fn split_to(&self, n: usize) -> BytesMut {
|
||||
self.get_mut().split_to(n)
|
||||
}
|
||||
|
||||
pub fn take(&self) -> BytesMut {
|
||||
self.get_mut().take()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn reserve(&self, cnt: usize) {
|
||||
self.get_mut().reserve(cnt)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
|
||||
pub fn extend(&self, data: Binary) {
|
||||
self.get_mut().extend_from_slice(data.as_ref());
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn extend_from_slice(&self, data: &[u8]) {
|
||||
self.get_mut().extend_from_slice(data);
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for SharedBytes {
|
||||
fn default() -> Self {
|
||||
SharedBytes(Some(Rc::new(BytesMut::new())), None)
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for SharedBytes {
|
||||
fn clone(&self) -> SharedBytes {
|
||||
SharedBytes(self.0.clone(), self.1.clone())
|
||||
}
|
||||
}
|
@@ -268,9 +268,9 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
|
||||
where U: IntoIterator<Item=V> + 'static,
|
||||
V: IntoHttpHandler<Handler=H>,
|
||||
{
|
||||
/// Start listening for incomming connections.
|
||||
/// Start listening for incoming connections.
|
||||
///
|
||||
/// This method starts number of http handler workers in seperate threads.
|
||||
/// This method starts number of http handler workers in separate threads.
|
||||
/// For each address this method starts separate thread which does `accept()` in a loop.
|
||||
///
|
||||
/// This methods panics if no socket addresses get bound.
|
||||
@@ -298,7 +298,7 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
|
||||
pub fn start(mut self) -> SyncAddress<Self>
|
||||
{
|
||||
if self.sockets.is_empty() {
|
||||
panic!("HttpServer::bind() has to be called befor start()");
|
||||
panic!("HttpServer::bind() has to be called before start()");
|
||||
} else {
|
||||
let addrs: Vec<(net::SocketAddr, net::TcpListener)> =
|
||||
self.sockets.drain().collect();
|
||||
@@ -320,7 +320,7 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn new thread and start listening for incomming connections.
|
||||
/// Spawn new thread and start listening for incoming connections.
|
||||
///
|
||||
/// This method spawns new thread and starts new actix system. Other than that it is
|
||||
/// similar to `start()` method. This method blocks.
|
||||
@@ -359,7 +359,7 @@ impl<H: HttpHandler, U, V> HttpServer<TlsStream<TcpStream>, net::SocketAddr, H,
|
||||
where U: IntoIterator<Item=V> + 'static,
|
||||
V: IntoHttpHandler<Handler=H>,
|
||||
{
|
||||
/// Start listening for incomming tls connections.
|
||||
/// Start listening for incoming tls connections.
|
||||
pub fn start_tls(mut self, pkcs12: ::Pkcs12) -> io::Result<SyncAddress<Self>> {
|
||||
if self.sockets.is_empty() {
|
||||
Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound"))
|
||||
@@ -398,7 +398,7 @@ impl<H: HttpHandler, U, V> HttpServer<SslStream<TcpStream>, net::SocketAddr, H,
|
||||
where U: IntoIterator<Item=V> + 'static,
|
||||
V: IntoHttpHandler<Handler=H>,
|
||||
{
|
||||
/// Start listening for incomming tls connections.
|
||||
/// Start listening for incoming tls connections.
|
||||
///
|
||||
/// This method sets alpn protocols to "h2" and "http/1.1"
|
||||
pub fn start_ssl(mut self, identity: &ParsedPkcs12) -> io::Result<SyncAddress<Self>> {
|
||||
@@ -443,7 +443,7 @@ impl<T, A, H, U, V> HttpServer<WrapperStream<T>, A, H, U>
|
||||
U: IntoIterator<Item=V> + 'static,
|
||||
V: IntoHttpHandler<Handler=H>,
|
||||
{
|
||||
/// Start listening for incomming connections from a stream.
|
||||
/// Start listening for incoming connections from a stream.
|
||||
///
|
||||
/// This method uses only one thread for handling incoming connections.
|
||||
pub fn start_incoming<S>(mut self, stream: S, secure: bool) -> SyncAddress<Self>
|
||||
@@ -663,7 +663,7 @@ fn start_accept_thread(sock: net::TcpListener, addr: net::SocketAddr, backlog: i
|
||||
}
|
||||
}
|
||||
|
||||
// Start listening for incommin commands
|
||||
// Start listening for incoming commands
|
||||
if let Err(err) = poll.register(®, CMD,
|
||||
mio::Ready::readable(), mio::PollOpt::edge()) {
|
||||
panic!("Can not register Registration: {}", err);
|
||||
|
10
src/test.rs
10
src/test.rs
@@ -29,7 +29,7 @@ use server::{HttpServer, HttpHandler, IntoHttpHandler, ServerSettings};
|
||||
/// The `TestServer` type.
|
||||
///
|
||||
/// `TestServer` is very simple test server that simplify process of writing
|
||||
/// integrational tests cases for actix web applications.
|
||||
/// integration tests cases for actix web applications.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
@@ -61,7 +61,7 @@ impl TestServer {
|
||||
|
||||
/// Start new test server
|
||||
///
|
||||
/// This methos accepts configuration method. You can add
|
||||
/// This method accepts configuration method. You can add
|
||||
/// middlewares or set handlers for test application.
|
||||
pub fn new<F>(config: F) -> Self
|
||||
where F: Sync + Send + 'static + Fn(&mut TestApp<()>),
|
||||
@@ -101,7 +101,7 @@ impl TestServer {
|
||||
|
||||
/// Start new test server with custom application state
|
||||
///
|
||||
/// This methos accepts state factory and configuration method.
|
||||
/// This method accepts state factory and configuration method.
|
||||
pub fn with_state<S, FS, F>(state: FS, config: F) -> Self
|
||||
where S: 'static,
|
||||
FS: Sync + Send + 'static + Fn() -> S,
|
||||
@@ -287,12 +287,12 @@ impl Default for TestRequest<()> {
|
||||
|
||||
impl TestRequest<()> {
|
||||
|
||||
/// Create TestReqeust and set request uri
|
||||
/// Create TestRequest and set request uri
|
||||
pub fn with_uri(path: &str) -> TestRequest<()> {
|
||||
TestRequest::default().uri(path)
|
||||
}
|
||||
|
||||
/// Create TestReqeust and set header
|
||||
/// Create TestRequest and set header
|
||||
pub fn with_header<K, V>(key: K, value: V) -> TestRequest<()>
|
||||
where HeaderName: HttpTryFrom<K>,
|
||||
HeaderValue: HttpTryFrom<V>
|
||||
|
@@ -1,8 +1,8 @@
|
||||
use std::mem;
|
||||
use std::collections::VecDeque;
|
||||
use futures::{Async, Poll};
|
||||
use futures::sync::oneshot::Sender;
|
||||
use futures::unsync::oneshot;
|
||||
use smallvec::SmallVec;
|
||||
|
||||
use actix::{Actor, ActorState, ActorContext, AsyncContext,
|
||||
Address, SyncAddress, Handler, Subscriber, ResponseType, SpawnHandle};
|
||||
@@ -23,7 +23,7 @@ use ws::proto::{OpCode, CloseCode};
|
||||
pub struct WebsocketContext<A, S=()> where A: Actor<Context=WebsocketContext<A, S>>,
|
||||
{
|
||||
inner: ContextImpl<A>,
|
||||
stream: VecDeque<ContextFrame>,
|
||||
stream: Option<SmallVec<[ContextFrame; 2]>>,
|
||||
request: HttpRequest<S>,
|
||||
disconnected: bool,
|
||||
}
|
||||
@@ -88,7 +88,7 @@ impl<A, S: 'static> WebsocketContext<A, S> where A: Actor<Context=Self> {
|
||||
pub fn from_request(req: HttpRequest<S>) -> WebsocketContext<A, S> {
|
||||
WebsocketContext {
|
||||
inner: ContextImpl::new(None),
|
||||
stream: VecDeque::new(),
|
||||
stream: None,
|
||||
request: req,
|
||||
disconnected: false,
|
||||
}
|
||||
@@ -107,7 +107,7 @@ impl<A, S> WebsocketContext<A, S> where A: Actor<Context=Self> {
|
||||
#[inline]
|
||||
fn write<B: Into<Binary>>(&mut self, data: B) {
|
||||
if !self.disconnected {
|
||||
self.stream.push_back(ContextFrame::Chunk(Some(data.into())));
|
||||
self.add_frame(ContextFrame::Chunk(Some(data.into())));
|
||||
} else {
|
||||
warn!("Trying to write to disconnected response");
|
||||
}
|
||||
@@ -173,7 +173,7 @@ impl<A, S> WebsocketContext<A, S> where A: Actor<Context=Self> {
|
||||
pub fn drain(&mut self) -> Drain<A> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.inner.modify();
|
||||
self.stream.push_back(ContextFrame::Drain(tx));
|
||||
self.add_frame(ContextFrame::Drain(tx));
|
||||
Drain::new(rx)
|
||||
}
|
||||
|
||||
@@ -182,6 +182,13 @@ impl<A, S> WebsocketContext<A, S> where A: Actor<Context=Self> {
|
||||
pub fn connected(&self) -> bool {
|
||||
!self.disconnected
|
||||
}
|
||||
|
||||
fn add_frame(&mut self, frame: ContextFrame) {
|
||||
if self.stream.is_none() {
|
||||
self.stream = Some(SmallVec::new());
|
||||
}
|
||||
self.stream.as_mut().map(|s| s.push(frame));
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, S> WebsocketContext<A, S> where A: Actor<Context=Self> {
|
||||
@@ -212,7 +219,7 @@ impl<A, S> ActorHttpContext for WebsocketContext<A, S> where A: Actor<Context=Se
|
||||
self.stop();
|
||||
}
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<ContextFrame>, Error> {
|
||||
fn poll(&mut self) -> Poll<Option<SmallVec<[ContextFrame;2]>>, Error> {
|
||||
let ctx: &mut WebsocketContext<A, S> = unsafe {
|
||||
mem::transmute(self as &mut WebsocketContext<A, S>)
|
||||
};
|
||||
@@ -225,8 +232,8 @@ impl<A, S> ActorHttpContext for WebsocketContext<A, S> where A: Actor<Context=Se
|
||||
}
|
||||
|
||||
// frames
|
||||
if let Some(frame) = self.stream.pop_front() {
|
||||
Ok(Async::Ready(Some(frame)))
|
||||
if let Some(data) = self.stream.take() {
|
||||
Ok(Async::Ready(Some(data)))
|
||||
} else if self.inner.alive() {
|
||||
Ok(Async::NotReady)
|
||||
} else {
|
||||
|
@@ -5,14 +5,7 @@ use bytes::BytesMut;
|
||||
|
||||
use body::Binary;
|
||||
use ws::proto::{OpCode, CloseCode};
|
||||
|
||||
|
||||
fn apply_mask(buf: &mut [u8], mask: &[u8; 4]) {
|
||||
let iter = buf.iter_mut().zip(mask.iter().cycle());
|
||||
for (byte, &key) in iter {
|
||||
*byte ^= key
|
||||
}
|
||||
}
|
||||
use ws::mask::apply_mask;
|
||||
|
||||
/// A struct representing a `WebSocket` frame.
|
||||
#[derive(Debug)]
|
||||
@@ -28,7 +21,7 @@ pub(crate) struct Frame {
|
||||
|
||||
impl Frame {
|
||||
|
||||
/// Desctructe frame
|
||||
/// Destruct frame
|
||||
pub fn unpack(self) -> (bool, OpCode, Binary) {
|
||||
(self.finished, self.opcode, self.payload)
|
||||
}
|
||||
|
120
src/ws/mask.rs
Normal file
120
src/ws/mask.rs
Normal file
@@ -0,0 +1,120 @@
|
||||
//! This is code from [Tungstenite project](https://github.com/snapview/tungstenite-rs)
|
||||
use std::cmp::min;
|
||||
use std::mem::uninitialized;
|
||||
use std::ptr::copy_nonoverlapping;
|
||||
|
||||
/// Mask/unmask a frame.
|
||||
#[inline]
|
||||
pub fn apply_mask(buf: &mut [u8], mask: &[u8; 4]) {
|
||||
apply_mask_fast32(buf, mask)
|
||||
}
|
||||
|
||||
/// A safe unoptimized mask application.
|
||||
#[inline]
|
||||
#[allow(dead_code)]
|
||||
fn apply_mask_fallback(buf: &mut [u8], mask: &[u8; 4]) {
|
||||
for (i, byte) in buf.iter_mut().enumerate() {
|
||||
*byte ^= mask[i & 3];
|
||||
}
|
||||
}
|
||||
|
||||
/// Faster version of `apply_mask()` which operates on 4-byte blocks.
|
||||
#[inline]
|
||||
#[allow(dead_code)]
|
||||
fn apply_mask_fast32(buf: &mut [u8], mask: &[u8; 4]) {
|
||||
// TODO replace this with read_unaligned() as it stabilizes.
|
||||
let mask_u32 = unsafe {
|
||||
let mut m: u32 = uninitialized();
|
||||
#[allow(trivial_casts)]
|
||||
copy_nonoverlapping(mask.as_ptr(), &mut m as *mut _ as *mut u8, 4);
|
||||
m
|
||||
};
|
||||
|
||||
let mut ptr = buf.as_mut_ptr();
|
||||
let mut len = buf.len();
|
||||
|
||||
// Possible first unaligned block.
|
||||
let head = min(len, (4 - (ptr as usize & 3)) & 3);
|
||||
let mask_u32 = if head > 0 {
|
||||
unsafe {
|
||||
xor_mem(ptr, mask_u32, head);
|
||||
ptr = ptr.offset(head as isize);
|
||||
}
|
||||
len -= head;
|
||||
if cfg!(target_endian = "big") {
|
||||
mask_u32.rotate_left(8 * head as u32)
|
||||
} else {
|
||||
mask_u32.rotate_right(8 * head as u32)
|
||||
}
|
||||
} else {
|
||||
mask_u32
|
||||
};
|
||||
|
||||
if len > 0 {
|
||||
debug_assert_eq!(ptr as usize % 4, 0);
|
||||
}
|
||||
|
||||
// Properly aligned middle of the data.
|
||||
while len > 4 {
|
||||
unsafe {
|
||||
*(ptr as *mut u32) ^= mask_u32;
|
||||
ptr = ptr.offset(4);
|
||||
len -= 4;
|
||||
}
|
||||
}
|
||||
|
||||
// Possible last block.
|
||||
if len > 0 {
|
||||
unsafe { xor_mem(ptr, mask_u32, len); }
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
// TODO: copy_nonoverlapping here compiles to call memcpy. While it is not so inefficient,
|
||||
// it could be done better. The compiler does not see that len is limited to 3.
|
||||
unsafe fn xor_mem(ptr: *mut u8, mask: u32, len: usize) {
|
||||
let mut b: u32 = uninitialized();
|
||||
#[allow(trivial_casts)]
|
||||
copy_nonoverlapping(ptr, &mut b as *mut _ as *mut u8, len);
|
||||
b ^= mask;
|
||||
#[allow(trivial_casts)]
|
||||
copy_nonoverlapping(&b as *const _ as *const u8, ptr, len);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{apply_mask_fallback, apply_mask_fast32};
|
||||
|
||||
#[test]
|
||||
fn test_apply_mask() {
|
||||
let mask = [
|
||||
0x6d, 0xb6, 0xb2, 0x80,
|
||||
];
|
||||
let unmasked = vec![
|
||||
0xf3, 0x00, 0x01, 0x02, 0x03, 0x80, 0x81, 0x82,
|
||||
0xff, 0xfe, 0x00, 0x17, 0x74, 0xf9, 0x12, 0x03,
|
||||
];
|
||||
|
||||
// Check masking with proper alignment.
|
||||
{
|
||||
let mut masked = unmasked.clone();
|
||||
apply_mask_fallback(&mut masked, &mask);
|
||||
|
||||
let mut masked_fast = unmasked.clone();
|
||||
apply_mask_fast32(&mut masked_fast, &mask);
|
||||
|
||||
assert_eq!(masked, masked_fast);
|
||||
}
|
||||
|
||||
// Check masking without alignment.
|
||||
{
|
||||
let mut masked = unmasked.clone();
|
||||
apply_mask_fallback(&mut masked[1..], &mask);
|
||||
|
||||
let mut masked_fast = unmasked.clone();
|
||||
apply_mask_fast32(&mut masked_fast[1..], &mask);
|
||||
|
||||
assert_eq!(masked, masked_fast);
|
||||
}
|
||||
}
|
||||
}
|
@@ -58,6 +58,7 @@ use httpresponse::{ConnectionType, HttpResponse, HttpResponseBuilder};
|
||||
mod frame;
|
||||
mod proto;
|
||||
mod context;
|
||||
mod mask;
|
||||
|
||||
use ws::frame::Frame;
|
||||
use ws::proto::{hash_key, OpCode};
|
||||
|
@@ -152,6 +152,66 @@ fn test_body_br_streaming() {
|
||||
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_head_empty() {
|
||||
let srv = test::TestServer::new(
|
||||
|app| app.handler(|_| {
|
||||
httpcodes::HTTPOk.build()
|
||||
.content_length(STR.len() as u64).finish()}));
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let mut res = client.head(&srv.url("/")).send().unwrap();
|
||||
assert!(res.status().is_success());
|
||||
let mut bytes = BytesMut::with_capacity(2048).writer();
|
||||
let len = res.headers()
|
||||
.get::<reqwest::header::ContentLength>().map(|ct_len| **ct_len).unwrap();
|
||||
assert_eq!(len, STR.len() as u64);
|
||||
let _ = res.copy_to(&mut bytes);
|
||||
let bytes = bytes.into_inner();
|
||||
assert!(bytes.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_head_binary() {
|
||||
let srv = test::TestServer::new(
|
||||
|app| app.handler(|_| {
|
||||
httpcodes::HTTPOk.build()
|
||||
.content_encoding(headers::ContentEncoding::Identity)
|
||||
.content_length(100).body(STR)}));
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let mut res = client.head(&srv.url("/")).send().unwrap();
|
||||
assert!(res.status().is_success());
|
||||
let mut bytes = BytesMut::with_capacity(2048).writer();
|
||||
let len = res.headers()
|
||||
.get::<reqwest::header::ContentLength>().map(|ct_len| **ct_len).unwrap();
|
||||
assert_eq!(len, STR.len() as u64);
|
||||
let _ = res.copy_to(&mut bytes);
|
||||
let bytes = bytes.into_inner();
|
||||
assert!(bytes.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_head_binary2() {
|
||||
let srv = test::TestServer::new(
|
||||
|app| app.handler(|_| {
|
||||
httpcodes::HTTPOk.build()
|
||||
.content_encoding(headers::ContentEncoding::Identity)
|
||||
.body(STR)
|
||||
}));
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let mut res = client.head(&srv.url("/")).send().unwrap();
|
||||
assert!(res.status().is_success());
|
||||
let mut bytes = BytesMut::with_capacity(2048).writer();
|
||||
let len = res.headers()
|
||||
.get::<reqwest::header::ContentLength>().map(|ct_len| **ct_len).unwrap();
|
||||
assert_eq!(len, STR.len() as u64);
|
||||
let _ = res.copy_to(&mut bytes);
|
||||
let bytes = bytes.into_inner();
|
||||
assert!(bytes.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_body_length() {
|
||||
let srv = test::TestServer::new(
|
||||
|
Reference in New Issue
Block a user