1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-07-06 02:45:21 +02:00

Compare commits

...

38 Commits

Author SHA1 Message Date
e8e2ca1526 refactor alpn support; upgrade openssl to 0.10 2018-01-25 10:24:04 -08:00
78967dea13 stop http context immediately 2018-01-24 20:17:14 -08:00
58a5d493b7 re-eanble write backpressure for h1 connections 2018-01-24 20:12:49 -08:00
c5341017cd fix typo 2018-01-23 15:39:53 -08:00
f4873fcdee stop websocket context 2018-01-23 15:35:39 -08:00
35efd017bb impl waiting for HttpContext 2018-01-23 09:42:04 -08:00
fb76c490c6 mention tokio handle in guide 2018-01-22 20:10:05 -08:00
3653c78e92 check example on stable 2018-01-22 19:49:19 -08:00
1053c44326 pin new actix version 2018-01-22 17:01:54 -08:00
e6ea177181 impl WebsocketContext::waiting() method 2018-01-22 16:55:50 -08:00
1957469061 code of conduct 2018-01-21 15:29:02 -08:00
2227120ae0 exclude examples 2018-01-21 09:09:19 -08:00
21c8c0371d travis config 2018-01-21 08:50:29 -08:00
1914a6a0d8 Always enable content encoding if encoding explicitly selected 2018-01-21 08:31:46 -08:00
1cff4619e7 reduce threshold for content encoding 2018-01-21 08:12:32 -08:00
7bb7adf89c relax InternalError constraints 2018-01-20 22:02:42 -08:00
f55ff24925 fix guide example 2018-01-20 21:40:18 -08:00
f5f78d79e6 update doc strings 2018-01-20 21:16:31 -08:00
9180625dfd refactor helper error types 2018-01-20 21:11:46 -08:00
552320bae2 add error logging guide section 2018-01-20 20:21:01 -08:00
7cf221f767 Log request processing errors 2018-01-20 20:12:24 -08:00
98931a8623 test case for broken transfer encoding 2018-01-20 16:51:18 -08:00
ae10a89014 use ws masking from tungstenite project 2018-01-20 16:47:34 -08:00
71d534dadb CORS middleware: allowed_headers is defaulting to None #50 2018-01-20 16:36:57 -08:00
867bb1d409 Merge branch 'master' of github.com:actix/actix-web 2018-01-20 16:12:51 -08:00
91c44a1cf1 Fix HEAD requests handling 2018-01-20 16:12:38 -08:00
3bc60a8d5d Merge pull request #53 from andreevlex/spelling-check-2
spelling check
2018-01-16 12:07:58 -08:00
58df8fa4b9 spelling check 2018-01-16 21:59:33 +03:00
81f92b43e5 Merge pull request #52 from andreevlex/spelling-check
spelling check
2018-01-15 14:16:54 -08:00
e1d9c3803b spelling check 2018-01-16 00:47:25 +03:00
a7c24aace1 flush is useless 2018-01-14 19:28:34 -08:00
89a89e7b18 refactor shared bytes api 2018-01-14 17:00:28 -08:00
3425f7be40 fix tests 2018-01-14 14:58:58 -08:00
09a6f8a34f disable alpn feature 2018-01-14 14:44:32 -08:00
7060f298b4 use more binary 2018-01-14 14:40:39 -08:00
33dbe15760 use Binary for writer trait 2018-01-14 13:50:38 -08:00
e95c7dfc29 use local actix-web for examples 2018-01-13 19:04:07 -08:00
927a92fcac impl HttpHandler for Box<HttpHandler> and add helper method Application::boxed() #49 2018-01-13 18:58:17 -08:00
55 changed files with 1023 additions and 506 deletions

View File

@ -42,15 +42,17 @@ before_script:
script:
- |
if [[ "$TRAVIS_RUST_VERSION" == "stable" ]]; then
if [[ "$TRAVIS_RUST_VERSION" == "1.20.0" ]]; then
cargo clean
USE_SKEPTIC=1 cargo test --features=alpn
else
cargo test --features=alpn
cargo clean
cargo test
# --features=alpn
fi
- |
if [[ "$TRAVIS_RUST_VERSION" == "1.20.0" ]]; then
if [[ "$TRAVIS_RUST_VERSION" == "stable" ]]; then
cd examples/basics && cargo check && cd ../..
cd examples/hello-world && cargo check && cd ../..
cd examples/multipart && cargo check && cd ../..

View File

@ -1,5 +1,29 @@
# Changes
## 0.3.3 (2018-01-25)
* Stop processing any events after context stop
* Re-enable write back-pressure for h1 connections
* Refactor HttpServer::start_ssl() method
* Upgrade openssl to 0.10
## 0.3.2 (2018-01-21)
* Fix HEAD requests handling
* Log request processing errors
* Always enable content encoding if encoding explicitly selected
* Allow multiple Applications on a single server with different state #49
* CORS middleware: allowed_headers is defaulting to None #50
## 0.3.1 (2018-01-13)
* Fix directory entry path #47

46
CODE_OF_CONDUCT.md Normal file
View File

@ -0,0 +1,46 @@
# Contributor Covenant Code of Conduct
## Our Pledge
In the interest of fostering an open and welcoming environment, we as contributors and maintainers pledge to making participation in our project and our community a harassment-free experience for everyone, regardless of age, body size, disability, ethnicity, gender identity and expression, level of experience, nationality, personal appearance, race, religion, or sexual identity and orientation.
## Our Standards
Examples of behavior that contributes to creating a positive environment include:
* Using welcoming and inclusive language
* Being respectful of differing viewpoints and experiences
* Gracefully accepting constructive criticism
* Focusing on what is best for the community
* Showing empathy towards other community members
Examples of unacceptable behavior by participants include:
* The use of sexualized language or imagery and unwelcome sexual attention or advances
* Trolling, insulting/derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or electronic address, without explicit permission
* Other conduct which could reasonably be considered inappropriate in a professional setting
## Our Responsibilities
Project maintainers are responsible for clarifying the standards of acceptable behavior and are expected to take appropriate and fair corrective action in response to any instances of unacceptable behavior.
Project maintainers have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct, or to ban temporarily or permanently any contributor for other behaviors that they deem inappropriate, threatening, offensive, or harmful.
## Scope
This Code of Conduct applies both within project spaces and in public spaces when an individual is representing the project or its community. Examples of representing a project or community include using an official project e-mail address, posting via an official social media account, or acting as an appointed representative at an online or offline event. Representation of a project may be further defined and clarified by project maintainers.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by contacting the project team at fafhrd91@gmail.com. The project team will review and investigate all complaints, and will respond in a way that it deems appropriate to the circumstances. The project team is obligated to maintain confidentiality with regard to the reporter of an incident. Further details of specific enforcement policies may be posted separately.
Project maintainers who do not follow or enforce the Code of Conduct in good faith may face temporary or permanent repercussions as determined by other members of the project's leadership.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, available at [http://contributor-covenant.org/version/1/4][version]
[homepage]: http://contributor-covenant.org
[version]: http://contributor-covenant.org/version/1/4/

View File

@ -1,6 +1,6 @@
[package]
name = "actix-web"
version = "0.3.1"
version = "0.3.3"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix web framework"
readme = "README.md"
@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous",
"web-programming::http-server", "web-programming::websocket"]
license = "MIT/Apache-2.0"
exclude = [".gitignore", ".travis.yml", ".cargo/config",
"appveyor.yml", "./examples/static/*"]
"appveyor.yml", "/examples/**"]
build = "build.rs"
[badges]
@ -71,17 +71,14 @@ native-tls = { version="0.1", optional = true }
tokio-tls = { version="0.1", optional = true }
# openssl
tokio-openssl = { version="0.1", optional = true }
openssl = { version="0.10", optional = true }
tokio-openssl = { version="0.2", optional = true }
[dependencies.actix]
version = "^0.4.2"
[dependencies.openssl]
version = "0.9"
optional = true
version = "^0.4.5"
[dev-dependencies]
env_logger = "0.4"
env_logger = "0.5"
reqwest = "0.8"
skeptic = "0.13"
serde_derive = "1.0"
@ -93,7 +90,6 @@ version_check = "0.1"
[profile.release]
lto = true
opt-level = 3
# debug = true
[workspace]
members = [

View File

@ -69,3 +69,9 @@ This project is licensed under either of
* MIT license ([LICENSE-MIT](LICENSE-MIT) or [http://opensource.org/licenses/MIT](http://opensource.org/licenses/MIT))
at your option.
## Code of Conduct
Contribution to the actix-web crate is organized under the terms of the
Contributor Covenant, the maintainer of actix-web, @fafhrd91, promises to
intervene to uphold that code of conduct.

View File

@ -25,6 +25,7 @@ fn main() {
"guide/src/qs_10.md",
"guide/src/qs_12.md",
"guide/src/qs_13.md",
"guide/src/qs_14.md",
]);
} else {
let _ = fs::File::create(f);

View File

@ -6,6 +6,6 @@ workspace = "../.."
[dependencies]
futures = "*"
env_logger = "0.4"
env_logger = "0.5"
actix = "0.4"
actix-web = { path = "../../" }
actix-web = { path="../.." }

View File

@ -7,6 +7,7 @@ extern crate env_logger;
extern crate futures;
use futures::Stream;
use std::{io, env};
use actix_web::*;
use actix_web::middleware::RequestSession;
use futures::future::{FutureResult, result};
@ -56,17 +57,17 @@ fn index(mut req: HttpRequest) -> Result<HttpResponse> {
fn p404(req: HttpRequest) -> Result<HttpResponse> {
// html
let html = format!(r#"<!DOCTYPE html><html><head><title>actix - basics</title><link rel="shortcut icon" type="image/x-icon" href="/favicon.ico" /></head>
let html = r#"<!DOCTYPE html><html><head><title>actix - basics</title><link rel="shortcut icon" type="image/x-icon" href="/favicon.ico" /></head>
<body>
<a href="index.html">back to home</a>
<h1>404</h1>
</body>
</html>"#);
</html>"#;
// response
Ok(HttpResponse::build(StatusCode::NOT_FOUND)
.content_type("text/html; charset=utf-8")
.body(&html).unwrap())
.body(html).unwrap())
}
@ -92,8 +93,9 @@ fn with_param(req: HttpRequest) -> Result<HttpResponse>
}
fn main() {
::std::env::set_var("RUST_LOG", "actix_web=info");
let _ = env_logger::init();
env::set_var("RUST_LOG", "actix_web=debug");
env::set_var("RUST_BACKTRACE", "1");
env_logger::init();
let sys = actix::System::new("basic-example");
let addr = HttpServer::new(
@ -121,6 +123,9 @@ fn main() {
_ => httpcodes::HTTPNotFound,
}
}))
.resource("/error.html", |r| r.f(|req| {
error::ErrorBadRequest(io::Error::new(io::ErrorKind::Other, "test"))
}))
// static files
.handler("/static/", fs::StaticFiles::new("../static/", true))
// redirect

View File

@ -5,9 +5,9 @@ authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../.."
[dependencies]
env_logger = "0.4"
env_logger = "0.5"
actix = "0.4"
actix-web = { git = "https://github.com/actix/actix-web" }
actix-web = { path = "../../" }
futures = "0.1"
uuid = { version = "0.5", features = ["serde", "v4"] }

View File

@ -8,7 +8,7 @@ use diesel::prelude::*;
use models;
use schema;
/// This is db executor actor. We are going to run 3 of them in parallele.
/// This is db executor actor. We are going to run 3 of them in parallel.
pub struct DbExecutor(pub SqliteConnection);
/// This is only message that this actor can handle, but it is easy to extend number of

View File

@ -5,6 +5,6 @@ authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../.."
[dependencies]
env_logger = "0.4"
env_logger = "0.5"
actix = "0.4"
actix-web = { path = "../../" }

View File

@ -15,4 +15,4 @@ serde_derive = "1.0"
json = "*"
actix = "0.4"
actix-web = { git = "https://github.com/actix/actix-web" }
actix-web = { path="../../" }

View File

@ -12,4 +12,4 @@ path = "src/main.rs"
env_logger = "*"
futures = "0.1"
actix = "0.4"
actix-web = { git = "https://github.com/actix/actix-web" }
actix-web = { path="../../" }

View File

@ -6,6 +6,6 @@ workspace = "../.."
[dependencies]
futures = "*"
env_logger = "0.4"
env_logger = "0.5"
actix = "0.4"
actix-web = { git = "https://github.com/actix/actix-web" }
actix-web = { path = "../../" }

View File

@ -1,5 +1,5 @@
#![cfg_attr(feature="cargo-clippy", allow(needless_pass_by_value))]
//! There are two level of statfulness in actix-web. Application has state
//! There are two level of statefulness in actix-web. Application has state
//! that is shared across all handlers within same Application.
//! And individual handler can have state.
@ -33,7 +33,7 @@ struct MyWebSocket {
}
impl Actor for MyWebSocket {
type Context = HttpContext<Self, AppState>;
type Context = ws::WebsocketContext<Self, AppState>;
}
impl Handler<ws::Message> for MyWebSocket {
@ -43,9 +43,9 @@ impl Handler<ws::Message> for MyWebSocket {
self.counter += 1;
println!("WS({}): {:?}", self.counter, msg);
match msg {
ws::Message::Ping(msg) => ws::WsWriter::pong(ctx, &msg),
ws::Message::Text(text) => ws::WsWriter::text(ctx, &text),
ws::Message::Binary(bin) => ws::WsWriter::binary(ctx, bin),
ws::Message::Ping(msg) => ctx.pong(&msg),
ws::Message::Text(text) => ctx.text(&text),
ws::Message::Binary(bin) => ctx.binary(bin),
ws::Message::Closed | ws::Message::Error => {
ctx.stop();
}

View File

@ -5,7 +5,7 @@ authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../.."
[dependencies]
env_logger = "0.4"
env_logger = "0.5"
actix = "0.4"
actix-web = { git = "https://github.com/actix/actix-web" }
actix-web = { path = "../../" }
tera = "*"

View File

@ -9,6 +9,7 @@ name = "server"
path = "src/main.rs"
[dependencies]
env_logger = "0.4"
env_logger = "0.5"
actix = "^0.4.2"
actix-web = { git = "https://github.com/actix/actix-web", features=["alpn"] }
actix-web = { path = "../../", features=["alpn"] }
openssl = { version="0.10", features = ["v110"] }

31
examples/tls/cert.pem Normal file
View File

@ -0,0 +1,31 @@
-----BEGIN CERTIFICATE-----
MIIFPjCCAyYCCQDvLYiYD+jqeTANBgkqhkiG9w0BAQsFADBhMQswCQYDVQQGEwJV
UzELMAkGA1UECAwCQ0ExCzAJBgNVBAcMAlNGMRAwDgYDVQQKDAdDb21wYW55MQww
CgYDVQQLDANPcmcxGDAWBgNVBAMMD3d3dy5leGFtcGxlLmNvbTAeFw0xODAxMjUx
NzQ2MDFaFw0xOTAxMjUxNzQ2MDFaMGExCzAJBgNVBAYTAlVTMQswCQYDVQQIDAJD
QTELMAkGA1UEBwwCU0YxEDAOBgNVBAoMB0NvbXBhbnkxDDAKBgNVBAsMA09yZzEY
MBYGA1UEAwwPd3d3LmV4YW1wbGUuY29tMIICIjANBgkqhkiG9w0BAQEFAAOCAg8A
MIICCgKCAgEA2WzIA2IpVR9Tb9EFhITlxuhE5rY2a3S6qzYNzQVgSFggxXEPn8k1
sQEcer5BfAP986Sck3H0FvB4Bt/I8PwOtUCmhwcc8KtB5TcGPR4fjXnrpC+MIK5U
NLkwuyBDKziYzTdBj8kUFX1WxmvEHEgqToPOZfBgsS71cJAR/zOWraDLSRM54jXy
voLZN4Ti9rQagQrvTQ44Vz5ycDQy7UxtbUGh1CVv69vNVr7/SOOh/Nw5FNOZWLWr
odGyoec5wh9iqRZgRqiTUc6Lt7V2RWc2X2gjwST2UfI+U46Ip3oaQ7ZD4eAkoqND
xdniBZAykVG3c/99ux4BAESTF8fsNch6UticBxYMuTu+ouvP0psfI9wwwNliJDmA
CRUTB9AgRynbL1AzhqQoDfsb98IZfjfNOpwnwuLwpMAPhbgd5KNdZaIJ4Hb6/stI
yFElOExxd3TAxF2Gshd/lq1JcNHAZ1DSXV5MvOWT/NWgXwbIzUgQ8eIi+HuDYX2U
UuaB6R8tbd52H7rbUv6HrfinuSlKWqjSYLkiKHkwUpoMw8y9UycRSzs1E9nPwPTO
vRXb0mNCQeBCV9FvStNVXdCUTT8LGPv87xSD2pmt7LijlE6mHLG8McfcWkzA69un
CEHIFAFDimTuN7EBljc119xWFTcHMyoZAfFF+oTqwSbBGImruCxnaJECAwEAATAN
BgkqhkiG9w0BAQsFAAOCAgEApavsgsn7SpPHfhDSN5iZs1ILZQRewJg0Bty0xPfk
3tynSW6bNH3nSaKbpsdmxxomthNSQgD2heOq1By9YzeOoNR+7Pk3s4FkASnf3ToI
JNTUasBFFfaCG96s4Yvs8KiWS/k84yaWuU8c3Wb1jXs5Rv1qE1Uvuwat1DSGXSoD
JNluuIkCsC4kWkyq5pWCGQrabWPRTWsHwC3PTcwSRBaFgYLJaR72SloHB1ot02zL
d2age9dmFRFLLCBzP+D7RojBvL37qS/HR+rQ4SoQwiVc/JzaeqSe7ZbvEH9sZYEu
ALowJzgbwro7oZflwTWunSeSGDSltkqKjvWvZI61pwfHKDahUTmZ5h2y67FuGEaC
CIOUI8dSVSPKITxaq3JL4ze2e9/0Lt7hj19YK2uUmtMAW5Tirz4Yx5lyGH9U8Wur
y/X8VPxTc4A9TMlJgkyz0hqvhbPOT/zSWB10zXh0glKAsSBryAOEDxV1UygmSir7
YV8Qaq+oyKUTMc1MFq5vZ07M51EPaietn85t8V2Y+k/8XYltRp32NxsypxAJuyxh
g/ko6RVTrWa1sMvz/F9LFqAdKiK5eM96lh9IU4xiLg4ob8aS/GRAA8oIFkZFhLrt
tOwjIUPmEPyHWFi8dLpNuQKYalLYhuwZftG/9xV+wqhKGZO9iPrpHSYBRTap8w2y
1QU=
-----END CERTIFICATE-----

Binary file not shown.

51
examples/tls/key.pem Normal file
View File

@ -0,0 +1,51 @@
-----BEGIN RSA PRIVATE KEY-----
MIIJKAIBAAKCAgEA2WzIA2IpVR9Tb9EFhITlxuhE5rY2a3S6qzYNzQVgSFggxXEP
n8k1sQEcer5BfAP986Sck3H0FvB4Bt/I8PwOtUCmhwcc8KtB5TcGPR4fjXnrpC+M
IK5UNLkwuyBDKziYzTdBj8kUFX1WxmvEHEgqToPOZfBgsS71cJAR/zOWraDLSRM5
4jXyvoLZN4Ti9rQagQrvTQ44Vz5ycDQy7UxtbUGh1CVv69vNVr7/SOOh/Nw5FNOZ
WLWrodGyoec5wh9iqRZgRqiTUc6Lt7V2RWc2X2gjwST2UfI+U46Ip3oaQ7ZD4eAk
oqNDxdniBZAykVG3c/99ux4BAESTF8fsNch6UticBxYMuTu+ouvP0psfI9wwwNli
JDmACRUTB9AgRynbL1AzhqQoDfsb98IZfjfNOpwnwuLwpMAPhbgd5KNdZaIJ4Hb6
/stIyFElOExxd3TAxF2Gshd/lq1JcNHAZ1DSXV5MvOWT/NWgXwbIzUgQ8eIi+HuD
YX2UUuaB6R8tbd52H7rbUv6HrfinuSlKWqjSYLkiKHkwUpoMw8y9UycRSzs1E9nP
wPTOvRXb0mNCQeBCV9FvStNVXdCUTT8LGPv87xSD2pmt7LijlE6mHLG8McfcWkzA
69unCEHIFAFDimTuN7EBljc119xWFTcHMyoZAfFF+oTqwSbBGImruCxnaJECAwEA
AQKCAgAME3aoeXNCPxMrSri7u4Xnnk71YXl0Tm9vwvjRQlMusXZggP8VKN/KjP0/
9AE/GhmoxqPLrLCZ9ZE1EIjgmZ9Xgde9+C8rTtfCG2RFUL7/5J2p6NonlocmxoJm
YkxYwjP6ce86RTjQWL3RF3s09u0inz9/efJk5O7M6bOWMQ9VZXDlBiRY5BYvbqUR
6FeSzD4MnMbdyMRoVBeXE88gTvZk8xhB6DJnLzYgc0tKiRoeKT0iYv5JZw25VyRM
ycLzfTrFmXCPfB1ylb483d9Ly4fBlM8nkx37PzEnAuukIawDxsPOb9yZC+hfvNJI
7NFiMN+3maEqG2iC00w4Lep4skHY7eHUEUMl+Wjr+koAy2YGLWAwHZQTm7iXn9Ab
L6adL53zyCKelRuEQOzbeosJAqS+5fpMK0ekXyoFIuskj7bWuIoCX7K/kg6q5IW+
vC2FrlsrbQ79GztWLVmHFO1I4J9M5r666YS0qdh8c+2yyRl4FmSiHfGxb3eOKpxQ
b6uI97iZlkxPF9LYUCSc7wq0V2gGz+6LnGvTHlHrOfVXqw/5pLAKhXqxvnroDTwz
0Ay/xFF6ei/NSxBY5t8ztGCBm45wCU3l8pW0X6dXqwUipw5b4MRy1VFRu6rqlmbL
OPSCuLxqyqsigiEYsBgS/icvXz9DWmCQMPd2XM9YhsHvUq+R4QKCAQEA98EuMMXI
6UKIt1kK2t/3OeJRyDd4iv/fCMUAnuPjLBvFE4cXD/SbqCxcQYqb+pue3PYkiTIC
71rN8OQAc5yKhzmmnCE5N26br/0pG4pwEjIr6mt8kZHmemOCNEzvhhT83nfKmV0g
9lNtuGEQMiwmZrpUOF51JOMC39bzcVjYX2Cmvb7cFbIq3lR0zwM+aZpQ4P8LHCIu
bgHmwbdlkLyIULJcQmHIbo6nPFB3ZZE4mqmjwY+rA6Fh9rgBa8OFCfTtrgeYXrNb
IgZQ5U8GoYRPNC2ot0vpTinraboa/cgm6oG4M7FW1POCJTl+/ktHEnKuO5oroSga
/BSg7hCNFVaOhwKCAQEA4Kkys0HtwEbV5mY/NnvUD5KwfXX7BxoXc9lZ6seVoLEc
KjgPYxqYRVrC7dB2YDwwp3qcRTi/uBAgFNm3iYlDzI4xS5SeaudUWjglj7BSgXE2
iOEa7EwcvVPluLaTgiWjlzUKeUCNNHWSeQOt+paBOT+IgwRVemGVpAgkqQzNh/nP
tl3p9aNtgzEm1qVlPclY/XUCtf3bcOR+z1f1b4jBdn0leu5OhnxkC+Htik+2fTXD
jt6JGrMkanN25YzsjnD3Sn+v6SO26H99wnYx5oMSdmb8SlWRrKtfJHnihphjG/YY
l1cyorV6M/asSgXNQfGJm4OuJi0I4/FL2wLUHnU+JwKCAQEAzh4WipcRthYXXcoj
gMKRkMOb3GFh1OpYqJgVExtudNTJmZxq8GhFU51MR27Eo7LycMwKy2UjEfTOnplh
Us2qZiPtW7k8O8S2m6yXlYUQBeNdq9IuuYDTaYD94vsazscJNSAeGodjE+uGvb1q
1wLqE87yoE7dUInYa1cOA3+xy2/CaNuviBFJHtzOrSb6tqqenQEyQf6h9/12+DTW
t5pSIiixHrzxHiFqOoCLRKGToQB+71rSINwTf0nITNpGBWmSj5VcC3VV3TG5/XxI
fPlxV2yhD5WFDPVNGBGvwPDSh4jSMZdZMSNBZCy4XWFNSKjGEWoK4DFYed3DoSt9
5IG1YwKCAQA63ntHl64KJUWlkwNbboU583FF3uWBjee5VqoGKHhf3CkKMxhtGqnt
+oN7t5VdUEhbinhqdx1dyPPvIsHCS3K1pkjqii4cyzNCVNYa2dQ00Qq+QWZBpwwc
3GAkz8rFXsGIPMDa1vxpU6mnBjzPniKMcsZ9tmQDppCEpBGfLpio2eAA5IkK8eEf
cIDB3CM0Vo94EvI76CJZabaE9IJ+0HIJb2+jz9BJ00yQBIqvJIYoNy9gP5Xjpi+T
qV/tdMkD5jwWjHD3AYHLWKUGkNwwkAYFeqT/gX6jpWBP+ZRPOp011X3KInJFSpKU
DT5GQ1Dux7EMTCwVGtXqjO8Ym5wjwwsfAoIBAEcxlhIW1G6BiNfnWbNPWBdh3v/K
5Ln98Rcrz8UIbWyl7qNPjYb13C1KmifVG1Rym9vWMO3KuG5atK3Mz2yLVRtmWAVc
fxzR57zz9MZFDun66xo+Z1wN3fVxQB4CYpOEI4Lb9ioX4v85hm3D6RpFukNtRQEc
Gfr4scTjJX4jFWDp0h6ffMb8mY+quvZoJ0TJqV9L9Yj6Ksdvqez/bdSraev97bHQ
4gbQxaTZ6WjaD4HjpPQefMdWp97Metg0ZQSS8b8EzmNFgyJ3XcjirzwliKTAQtn6
I2sd0NCIooelrKRD8EJoDUwxoOctY7R97wpZ7/wEHU45cBCbRV3H4JILS5c=
-----END RSA PRIVATE KEY-----

View File

@ -2,14 +2,13 @@
extern crate actix;
extern crate actix_web;
extern crate env_logger;
use std::fs::File;
use std::io::Read;
extern crate openssl;
use actix_web::*;
use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype};
/// somple handle
/// simple handle
fn index(req: HttpRequest) -> Result<HttpResponse> {
println!("{:?}", req);
Ok(httpcodes::HTTPOk
@ -20,15 +19,15 @@ fn index(req: HttpRequest) -> Result<HttpResponse> {
fn main() {
if ::std::env::var("RUST_LOG").is_err() {
::std::env::set_var("RUST_LOG", "actix_web=trace");
::std::env::set_var("RUST_LOG", "actix_web=info");
}
let _ = env_logger::init();
let sys = actix::System::new("ws-example");
let mut file = File::open("identity.pfx").unwrap();
let mut pkcs12 = vec![];
file.read_to_end(&mut pkcs12).unwrap();
let pkcs12 = Pkcs12::from_der(&pkcs12).unwrap().parse("12345").unwrap();
// load ssl keys
let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
builder.set_private_key_file("key.pem", SslFiletype::PEM).unwrap();
builder.set_certificate_chain_file("cert.pem").unwrap();
let addr = HttpServer::new(
|| Application::new()
@ -44,7 +43,7 @@ fn main() {
.body(Body::Empty)
})))
.bind("127.0.0.1:8443").unwrap()
.start_ssl(&pkcs12).unwrap();
.start_ssl(builder).unwrap();
println!("Started http server: 127.0.0.1:8443");
let _ = sys.run();

View File

@ -26,4 +26,4 @@ serde_json = "1.0"
serde_derive = "1.0"
actix = "^0.4.2"
actix-web = { git = "https://github.com/actix/actix-web" }
actix-web = { path="../../" }

View File

@ -16,8 +16,8 @@ Chat server listens for incoming tcp connections. Server can access several type
* `\list` - list all available rooms
* `\join name` - join room, if room does not exist, create new one
* `\name name` - set session name
* `some message` - just string, send messsage to all peers in same room
* client has to send heartbeat `Ping` messages, if server does not receive a heartbeat message for 10 seconds connection gets droppped
* `some message` - just string, send message to all peers in same room
* client has to send heartbeat `Ping` messages, if server does not receive a heartbeat message for 10 seconds connection gets dropped
To start server use command: `cargo run --bin server`

View File

@ -16,7 +16,7 @@ use codec::{ChatRequest, ChatResponse, ChatCodec};
#[derive(Message)]
pub struct Message(pub String);
/// `ChatSession` actor is responsible for tcp peer communitions.
/// `ChatSession` actor is responsible for tcp peer communications.
pub struct ChatSession {
/// unique session id
id: usize,
@ -30,7 +30,7 @@ pub struct ChatSession {
impl Actor for ChatSession {
/// For tcp communication we are going to use `FramedContext`.
/// It is convinient wrapper around `Framed` object from `tokio_io`
/// It is convenient wrapper around `Framed` object from `tokio_io`
type Context = FramedContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
@ -149,7 +149,7 @@ impl ChatSession {
}
/// Define tcp server that will accept incomint tcp connection and create
/// Define tcp server that will accept incoming tcp connection and create
/// chat actors.
pub struct TcpServer {
chat: SyncAddress<ChatServer>,

View File

@ -12,24 +12,26 @@ With enable `alpn` feature `HttpServer` provides
```toml
[dependencies]
actix-web = { git = "https://github.com/actix/actix-web", features=["alpn"] }
actix-web = { version = "0.3.3", features=["alpn"] }
openssl = { version="0.10", features = ["v110"] }
```
```rust,ignore
use std::fs::File;
use actix_web::*;
use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype};
fn main() {
let mut file = File::open("identity.pfx").unwrap();
let mut pkcs12 = vec![];
file.read_to_end(&mut pkcs12).unwrap();
let pkcs12 = Pkcs12::from_der(&pkcs12).unwrap().parse("12345").unwrap();
// load ssl keys
let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
builder.set_private_key_file("key.pem", SslFiletype::PEM).unwrap();
builder.set_certificate_chain_file("cert.pem").unwrap();
HttpServer::new(
|| Application::new()
.resource("/index.html", |r| r.f(index)))
.bind("127.0.0.1:8080").unwrap();
.serve_ssl(pkcs12).unwrap();
.serve_ssl(builder).unwrap();
}
```

View File

@ -235,3 +235,12 @@ fn main() {
```
Both methods could be combined. (i.e Async response with streaming body)
## Tokio core handle
Any actix web handler runs within properly configured
[actix system](https://actix.github.io/actix/actix/struct.System.html)
and [arbiter](https://actix.github.io/actix/actix/struct.Arbiter.html).
You can always get access to tokio handle via
[Arbiter::handle()](https://actix.github.io/actix/actix/struct.Arbiter.html#method.handle)
method.

View File

@ -134,3 +134,18 @@ fn index(req: HttpRequest) -> Result<&'static str> {
```
In this example *BAD REQUEST* response get generated for `MyError` error.
## Error logging
Actix logs all errors with `WARN` log level. If log level set to `DEBUG`
and `RUST_BACKTRACE` is enabled, backtrace get logged. The Error type uses
cause's error backtrace if available, if the underlying failure does not provide
a backtrace, a new backtrace is constructed pointing to that conversion point
(rather than the origin of the error). This construction only happens if there
is no underlying backtrace; if it does have a backtrace no new backtrace is constructed.
You can enable backtrace and debug logging with following command:
```
>> RUST_BACKTRACE=1 RUST_LOG=actix_web=debug cargo run
```

View File

@ -59,9 +59,11 @@ impl<S: 'static> PipelineHandler<S> for Inner<S> {
#[cfg(test)]
impl<S: 'static> HttpApplication<S> {
#[cfg(test)]
pub(crate) fn run(&mut self, req: HttpRequest<S>) -> Reply {
self.inner.borrow_mut().handle(req)
}
#[cfg(test)]
pub(crate) fn prepare_request(&self, req: HttpRequest) -> HttpRequest<S> {
req.with_state(Rc::clone(&self.state), self.router.clone())
}
@ -134,7 +136,7 @@ impl<S> Application<S> where S: 'static {
/// Create application with specific state. Application can be
/// configured with builder-like pattern.
///
/// State is shared with all reousrces within same application and could be
/// State is shared with all resources within same application and could be
/// accessed with `HttpRequest::state()` method.
pub fn with_state(state: S) -> Application<S> {
Application {
@ -154,7 +156,7 @@ impl<S> Application<S> where S: 'static {
/// Set application prefix
///
/// Only requests that matches application's prefix get processed by this application.
/// Application prefix always contains laading "/" slash. If supplied prefix
/// Application prefix always contains leading "/" slash. If supplied prefix
/// does not contain leading slash, it get inserted. Prefix should
/// consists valid path segments. i.e for application with
/// prefix `/app` any request with following paths `/app`, `/app/` or `/app/test`
@ -356,6 +358,40 @@ impl<S> Application<S> where S: 'static {
middlewares: Rc::new(parts.middlewares),
}
}
/// Convenience method for creating `Box<HttpHandler>` instance.
///
/// This method is useful if you need to register several application instances
/// with different state.
///
/// ```rust
/// # use std::thread;
/// # extern crate actix_web;
/// use actix_web::*;
///
/// struct State1;
///
/// struct State2;
///
/// fn main() {
/// # thread::spawn(|| {
/// HttpServer::new(|| { vec![
/// Application::with_state(State1)
/// .prefix("/app1")
/// .resource("/", |r| r.h(httpcodes::HTTPOk))
/// .boxed(),
/// Application::with_state(State2)
/// .prefix("/app2")
/// .resource("/", |r| r.h(httpcodes::HTTPOk))
/// .boxed() ]})
/// .bind("127.0.0.1:8080").unwrap()
/// .run()
/// # });
/// }
/// ```
pub fn boxed(mut self) -> Box<HttpHandler> {
Box::new(self.finish())
}
}
impl<S: 'static> IntoHttpHandler for Application<S> {

View File

@ -1,4 +1,4 @@
use std::fmt;
use std::{fmt, mem};
use std::rc::Rc;
use std::sync::Arc;
use bytes::{Bytes, BytesMut};
@ -31,7 +31,7 @@ pub enum Binary {
Bytes(Bytes),
/// Static slice
Slice(&'static [u8]),
/// Shared stirng body
/// Shared string body
SharedString(Rc<String>),
/// Shared string body
#[doc(hidden)]
@ -122,6 +122,22 @@ impl Binary {
pub fn from_slice(s: &[u8]) -> Binary {
Binary::Bytes(Bytes::from(s))
}
/// Convert Binary to a Bytes instance
pub fn take(&mut self) -> Bytes {
mem::replace(self, Binary::Slice(b"")).into()
}
}
impl Clone for Binary {
fn clone(&self) -> Binary {
match *self {
Binary::Bytes(ref bytes) => Binary::Bytes(bytes.clone()),
Binary::Slice(slice) => Binary::Bytes(Bytes::from(slice)),
Binary::SharedString(ref s) => Binary::Bytes(Bytes::from(s.as_str())),
Binary::ArcSharedString(ref s) => Binary::Bytes(Bytes::from(s.as_str())),
}
}
}
impl Into<Bytes> for Binary {

View File

@ -18,7 +18,7 @@ use httprequest::HttpRequest;
pub trait ActorHttpContext: 'static {
fn disconnected(&mut self);
fn poll(&mut self) -> Poll<Option<SmallVec<[Frame; 2]>>, Error>;
fn poll(&mut self) -> Poll<Option<SmallVec<[Frame; 4]>>, Error>;
}
#[derive(Debug)]
@ -27,11 +27,20 @@ pub enum Frame {
Drain(oneshot::Sender<()>),
}
impl Frame {
pub fn len(&self) -> usize {
match *self {
Frame::Chunk(Some(ref bin)) => bin.len(),
_ => 0,
}
}
}
/// Http actor execution context
pub struct HttpContext<A, S=()> where A: Actor<Context=HttpContext<A, S>>,
{
inner: ContextImpl<A>,
stream: Option<SmallVec<[Frame; 2]>>,
stream: Option<SmallVec<[Frame; 4]>>,
request: HttpRequest<S>,
disconnected: bool,
}
@ -51,16 +60,25 @@ impl<A, S> ActorContext for HttpContext<A, S> where A: Actor<Context=Self>
impl<A, S> AsyncContext<A> for HttpContext<A, S> where A: Actor<Context=Self>
{
#[inline]
fn spawn<F>(&mut self, fut: F) -> SpawnHandle
where F: ActorFuture<Item=(), Error=(), Actor=A> + 'static
{
self.inner.spawn(fut)
}
#[inline]
fn wait<F>(&mut self, fut: F)
where F: ActorFuture<Item=(), Error=(), Actor=A> + 'static
{
self.inner.wait(fut)
}
#[doc(hidden)]
#[inline]
fn waiting(&self) -> bool {
self.inner.waiting() || self.inner.state() == ActorState::Stopping ||
self.inner.state() == ActorState::Stopped
}
#[inline]
fn cancel_future(&mut self, handle: SpawnHandle) -> bool {
self.inner.cancel_future(handle)
}
@ -127,7 +145,7 @@ impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
}
}
/// Indicate end of streamimng payload. Also this method calls `Self::close`.
/// Indicate end of streaming payload. Also this method calls `Self::close`.
#[inline]
pub fn write_eof(&mut self) {
self.add_frame(Frame::Chunk(None));
@ -184,7 +202,7 @@ impl<A, S> ActorHttpContext for HttpContext<A, S> where A: Actor<Context=Self>,
self.stop();
}
fn poll(&mut self) -> Poll<Option<SmallVec<[Frame; 2]>>, Error> {
fn poll(&mut self) -> Poll<Option<SmallVec<[Frame; 4]>>, Error> {
let ctx: &mut HttpContext<A, S> = unsafe {
std::mem::transmute(self as &mut HttpContext<A, S>)
};

View File

@ -9,8 +9,8 @@ use std::error::Error as StdError;
use cookie;
use httparse;
use failure::Fail;
use futures::Canceled;
use failure::{Fail, Backtrace};
use http2::Error as Http2Error;
use http::{header, StatusCode, Error as HttpError};
use http::uri::InvalidUriBytes;
@ -22,6 +22,8 @@ use url::ParseError as UrlParseError;
pub use cookie::{ParseError as CookieParseError};
use body::Body;
use handler::Responder;
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
use httpcodes::{HTTPBadRequest, HTTPMethodNotAllowed, HTTPExpectationFailed};
@ -33,9 +35,9 @@ use httpcodes::{HTTPBadRequest, HTTPMethodNotAllowed, HTTPExpectationFailed};
pub type Result<T, E=Error> = result::Result<T, E>;
/// General purpose actix web error
#[derive(Fail, Debug)]
pub struct Error {
cause: Box<ResponseError>,
backtrace: Option<Backtrace>,
}
impl Error {
@ -64,6 +66,16 @@ impl fmt::Display for Error {
}
}
impl fmt::Debug for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if let Some(bt) = self.cause.backtrace() {
write!(f, "{:?}\n\n{:?}", &self.cause, bt)
} else {
write!(f, "{:?}\n\n{:?}", &self.cause, self.backtrace.as_ref().unwrap())
}
}
}
/// `HttpResponse` for `Error`
impl From<Error> for HttpResponse {
fn from(err: Error) -> Self {
@ -74,7 +86,12 @@ impl From<Error> for HttpResponse {
/// `Error` for any error that implements `ResponseError`
impl<T: ResponseError> From<T> for Error {
fn from(err: T) -> Error {
Error { cause: Box::new(err) }
let backtrace = if err.backtrace().is_none() {
Some(Backtrace::new())
} else {
None
};
Error { cause: Box::new(err), backtrace: backtrace }
}
}
@ -320,7 +337,7 @@ pub enum WsHandshakeError {
/// Only get method is allowed
#[fail(display="Method not allowed")]
GetMethodRequired,
/// Ugrade header if not set to websocket
/// Upgrade header if not set to websocket
#[fail(display="Websocket upgrade is expected")]
NoWebsocketUpgrade,
/// Connection header is not set to upgrade
@ -329,7 +346,7 @@ pub enum WsHandshakeError {
/// Websocket version header is not set
#[fail(display="Websocket version header is required")]
NoVersionHeader,
/// Unsupported websockt version
/// Unsupported websocket version
#[fail(display="Unsupported version")]
UnsupportedVersion,
/// Websocket key is not set or wrong
@ -478,39 +495,10 @@ impl From<UrlParseError> for UrlGenerationError {
}
}
macro_rules! ERROR_WRAP {
($type:ty, $status:expr) => {
unsafe impl<T> Sync for $type {}
unsafe impl<T> Send for $type {}
impl<T> $type {
pub fn cause(&self) -> &T {
&self.0
}
}
impl<T: fmt::Debug + 'static> Fail for $type {}
impl<T: fmt::Debug + 'static> fmt::Display for $type {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self.0)
}
}
impl<T> ResponseError for $type
where T: Send + Sync + fmt::Debug + 'static,
{
fn error_response(&self) -> HttpResponse {
HttpResponse::new($status, Body::Empty)
}
}
}
}
/// Helper type that can wrap any error and generate *BAD REQUEST* response.
/// Helper type that can wrap any error and generate custom response.
///
/// In following example any `io::Error` will be converted into "BAD REQUEST" response
/// as oposite to *INNTERNAL SERVER ERROR* which is defined by default.
/// as opposite to *INNTERNAL SERVER ERROR* which is defined by default.
///
/// ```rust
/// # extern crate actix_web;
@ -523,59 +511,133 @@ macro_rules! ERROR_WRAP {
/// }
/// # fn main() {}
/// ```
#[derive(Debug)]
pub struct ErrorBadRequest<T>(pub T);
ERROR_WRAP!(ErrorBadRequest<T>, StatusCode::BAD_REQUEST);
pub struct InternalError<T> {
cause: T,
status: StatusCode,
backtrace: Backtrace,
}
#[derive(Debug)]
/// Helper type that can wrap any error and generate *UNAUTHORIZED* response.
pub struct ErrorUnauthorized<T>(pub T);
ERROR_WRAP!(ErrorUnauthorized<T>, StatusCode::UNAUTHORIZED);
unsafe impl<T> Sync for InternalError<T> {}
unsafe impl<T> Send for InternalError<T> {}
#[derive(Debug)]
/// Helper type that can wrap any error and generate *FORBIDDEN* response.
pub struct ErrorForbidden<T>(pub T);
ERROR_WRAP!(ErrorForbidden<T>, StatusCode::FORBIDDEN);
impl<T> InternalError<T> {
pub fn new(err: T, status: StatusCode) -> Self {
InternalError {
cause: err,
status: status,
backtrace: Backtrace::new(),
}
}
}
#[derive(Debug)]
/// Helper type that can wrap any error and generate *NOT FOUND* response.
pub struct ErrorNotFound<T>(pub T);
ERROR_WRAP!(ErrorNotFound<T>, StatusCode::NOT_FOUND);
impl<T> Fail for InternalError<T>
where T: Send + Sync + fmt::Debug + 'static
{
fn backtrace(&self) -> Option<&Backtrace> {
Some(&self.backtrace)
}
}
#[derive(Debug)]
/// Helper type that can wrap any error and generate *METHOD NOT ALLOWED* response.
pub struct ErrorMethodNotAllowed<T>(pub T);
ERROR_WRAP!(ErrorMethodNotAllowed<T>, StatusCode::METHOD_NOT_ALLOWED);
impl<T> fmt::Debug for InternalError<T>
where T: Send + Sync + fmt::Debug + 'static
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.cause, f)
}
}
#[derive(Debug)]
/// Helper type that can wrap any error and generate *REQUEST TIMEOUT* response.
pub struct ErrorRequestTimeout<T>(pub T);
ERROR_WRAP!(ErrorRequestTimeout<T>, StatusCode::REQUEST_TIMEOUT);
impl<T> fmt::Display for InternalError<T>
where T: Send + Sync + fmt::Debug + 'static
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.cause, f)
}
}
#[derive(Debug)]
/// Helper type that can wrap any error and generate *CONFLICT* response.
pub struct ErrorConflict<T>(pub T);
ERROR_WRAP!(ErrorConflict<T>, StatusCode::CONFLICT);
impl<T> ResponseError for InternalError<T>
where T: Send + Sync + fmt::Debug + 'static
{
fn error_response(&self) -> HttpResponse {
HttpResponse::new(self.status, Body::Empty)
}
}
#[derive(Debug)]
/// Helper type that can wrap any error and generate *GONE* response.
pub struct ErrorGone<T>(pub T);
ERROR_WRAP!(ErrorGone<T>, StatusCode::GONE);
impl<T> Responder for InternalError<T>
where T: Send + Sync + fmt::Debug + 'static
{
type Item = HttpResponse;
type Error = Error;
#[derive(Debug)]
/// Helper type that can wrap any error and generate *PRECONDITION FAILED* response.
pub struct ErrorPreconditionFailed<T>(pub T);
ERROR_WRAP!(ErrorPreconditionFailed<T>, StatusCode::PRECONDITION_FAILED);
fn respond_to(self, _: HttpRequest) -> Result<HttpResponse, Error> {
Err(self.into())
}
}
#[derive(Debug)]
/// Helper type that can wrap any error and generate *EXPECTATION FAILED* response.
pub struct ErrorExpectationFailed<T>(pub T);
ERROR_WRAP!(ErrorExpectationFailed<T>, StatusCode::EXPECTATION_FAILED);
/// Helper function that creates wrapper of any error and generate *BAD REQUEST* response.
#[allow(non_snake_case)]
pub fn ErrorBadRequest<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::BAD_REQUEST)
}
#[derive(Debug)]
/// Helper type that can wrap any error and generate *INTERNAL SERVER ERROR* response.
pub struct ErrorInternalServerError<T>(pub T);
ERROR_WRAP!(ErrorInternalServerError<T>, StatusCode::INTERNAL_SERVER_ERROR);
/// Helper function that creates wrapper of any error and generate *UNAUTHORIZED* response.
#[allow(non_snake_case)]
pub fn ErrorUnauthorized<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::UNAUTHORIZED)
}
/// Helper function that creates wrapper of any error and generate *FORBIDDEN* response.
#[allow(non_snake_case)]
pub fn ErrorForbidden<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::FORBIDDEN)
}
/// Helper function that creates wrapper of any error and generate *NOT FOUND* response.
#[allow(non_snake_case)]
pub fn ErrorNotFound<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::NOT_FOUND)
}
/// Helper function that creates wrapper of any error and generate *METHOD NOT ALLOWED* response.
#[allow(non_snake_case)]
pub fn ErrorMethodNotAllowed<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::METHOD_NOT_ALLOWED)
}
/// Helper function that creates wrapper of any error and generate *REQUEST TIMEOUT* response.
#[allow(non_snake_case)]
pub fn ErrorRequestTimeout<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::REQUEST_TIMEOUT)
}
/// Helper function that creates wrapper of any error and generate *CONFLICT* response.
#[allow(non_snake_case)]
pub fn ErrorConflict<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::CONFLICT)
}
/// Helper function that creates wrapper of any error and generate *GONE* response.
#[allow(non_snake_case)]
pub fn ErrorGone<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::GONE)
}
/// Helper function that creates wrapper of any error and generate *PRECONDITION FAILED* response.
#[allow(non_snake_case)]
pub fn ErrorPreconditionFailed<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::PRECONDITION_FAILED)
}
/// Helper function that creates wrapper of any error and generate *EXPECTATION FAILED* response.
#[allow(non_snake_case)]
pub fn ErrorExpectationFailed<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::EXPECTATION_FAILED)
}
/// Helper function that creates wrapper of any error and generate *INTERNAL SERVER ERROR* response.
#[allow(non_snake_case)]
pub fn ErrorInternalServerError<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::INTERNAL_SERVER_ERROR)
}
#[cfg(test)]
mod tests {

View File

@ -9,7 +9,7 @@ use error::Error;
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
/// Trait defines object that could be regestered as route handler
/// Trait defines object that could be registered as route handler
#[allow(unused_variables)]
pub trait Handler<S>: 'static {
@ -35,7 +35,7 @@ pub trait Responder {
}
#[doc(hidden)]
/// Convinience trait that convert `Future` object into `Boxed` future
/// Convenience trait that convert `Future` object into `Boxed` future
pub trait AsyncResponder<I, E>: Sized {
fn responder(self) -> Box<Future<Item=I, Error=E>>;
}
@ -193,7 +193,7 @@ impl<I, E> Responder for Box<Future<Item=I, Error=E>>
}
}
/// Trait defines object that could be regestered as resource route
/// Trait defines object that could be registered as resource route
pub(crate) trait RouteHandler<S>: 'static {
fn handle(&mut self, req: HttpRequest<S>) -> Reply;
}
@ -341,7 +341,7 @@ impl Default for NormalizePath {
}
impl NormalizePath {
/// Create new `NoramlizePath` instance
/// Create new `NormalizePath` instance
pub fn new(append: bool, merge: bool, redirect: StatusCode) -> NormalizePath {
NormalizePath {
append: append,

View File

@ -66,84 +66,6 @@ impl fmt::Write for CachedDate {
}
}
/// Internal use only! unsafe
#[derive(Debug)]
pub(crate) struct SharedBytesPool(RefCell<VecDeque<Rc<BytesMut>>>);
impl SharedBytesPool {
pub fn new() -> SharedBytesPool {
SharedBytesPool(RefCell::new(VecDeque::with_capacity(128)))
}
pub fn get_bytes(&self) -> Rc<BytesMut> {
if let Some(bytes) = self.0.borrow_mut().pop_front() {
bytes
} else {
Rc::new(BytesMut::new())
}
}
pub fn release_bytes(&self, mut bytes: Rc<BytesMut>) {
let v = &mut self.0.borrow_mut();
if v.len() < 128 {
Rc::get_mut(&mut bytes).unwrap().take();
v.push_front(bytes);
}
}
}
#[derive(Debug)]
pub(crate) struct SharedBytes(
Option<Rc<BytesMut>>, Option<Rc<SharedBytesPool>>);
impl Drop for SharedBytes {
fn drop(&mut self) {
if let Some(ref pool) = self.1 {
if let Some(bytes) = self.0.take() {
if Rc::strong_count(&bytes) == 1 {
pool.release_bytes(bytes);
}
}
}
}
}
impl SharedBytes {
pub fn empty() -> Self {
SharedBytes(None, None)
}
pub fn new(bytes: Rc<BytesMut>, pool: Rc<SharedBytesPool>) -> SharedBytes {
SharedBytes(Some(bytes), Some(pool))
}
#[inline(always)]
#[allow(mutable_transmutes)]
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))]
pub fn get_mut(&self) -> &mut BytesMut {
let r: &BytesMut = self.0.as_ref().unwrap().as_ref();
unsafe{mem::transmute(r)}
}
#[inline]
pub fn get_ref(&self) -> &BytesMut {
self.0.as_ref().unwrap()
}
}
impl Default for SharedBytes {
fn default() -> Self {
SharedBytes(Some(Rc::new(BytesMut::new())), None)
}
}
impl Clone for SharedBytes {
fn clone(&self) -> SharedBytes {
SharedBytes(self.0.clone(), self.1.clone())
}
}
/// Internal use only! unsafe
pub(crate) struct SharedMessagePool(RefCell<VecDeque<Rc<HttpMessage>>>);

View File

@ -222,7 +222,7 @@ impl<S> HttpRequest<S> {
self.uri().path()
}
/// Get *ConnectionInfo* for currect request.
/// Get *ConnectionInfo* for correct request.
pub fn connection_info(&self) -> &ConnectionInfo {
if self.as_ref().info.is_none() {
let info: ConnectionInfo<'static> = unsafe{
@ -278,7 +278,7 @@ impl<S> HttpRequest<S> {
/// Peer socket address
///
/// Peer address is actuall socket address, if proxy is used in front of
/// Peer address is actual socket address, if proxy is used in front of
/// actix http server, then peer address would be address of this proxy.
///
/// To get client connection information `connection_info()` method should be used.

View File

@ -164,8 +164,8 @@ impl HttpResponse {
/// Content encoding
#[inline]
pub fn content_encoding(&self) -> &ContentEncoding {
&self.get_ref().encoding
pub fn content_encoding(&self) -> ContentEncoding {
self.get_ref().encoding
}
/// Set content encoding
@ -423,8 +423,8 @@ impl HttpResponseBuilder {
}
/// This method calls provided closure with builder reference if value is Some.
pub fn if_some<T, F>(&mut self, value: Option<&T>, f: F) -> &mut Self
where F: FnOnce(&T, &mut HttpResponseBuilder)
pub fn if_some<T, F>(&mut self, value: Option<T>, f: F) -> &mut Self
where F: FnOnce(T, &mut HttpResponseBuilder)
{
if let Some(val) = value {
f(val, self);
@ -812,11 +812,11 @@ mod tests {
#[test]
fn test_content_encoding() {
let resp = HttpResponse::build(StatusCode::OK).finish().unwrap();
assert_eq!(*resp.content_encoding(), ContentEncoding::Auto);
assert_eq!(resp.content_encoding(), ContentEncoding::Auto);
let resp = HttpResponse::build(StatusCode::OK)
.content_encoding(ContentEncoding::Br).finish().unwrap();
assert_eq!(*resp.content_encoding(), ContentEncoding::Br);
assert_eq!(resp.content_encoding(), ContentEncoding::Br);
}
#[test]

View File

@ -214,7 +214,7 @@ impl Cors {
/// This method register cors middleware with resource and
/// adds route for *OPTIONS* preflight requests.
///
/// It is possible to register *Cors* middlware with `Resource::middleware()`
/// It is possible to register *Cors* middleware with `Resource::middleware()`
/// method, but in that case *Cors* middleware wont be able to handle *OPTIONS*
/// requests.
pub fn register<S: 'static>(self, resource: &mut Resource<S>) {
@ -295,16 +295,23 @@ impl<S> Middleware<S> for Cors {
self.validate_allowed_method(req)?;
self.validate_allowed_headers(req)?;
// allowed headers
let headers = if let Some(headers) = self.headers.as_ref() {
Some(HeaderValue::try_from(&headers.iter().fold(
String::new(), |s, v| s + "," + v.as_str()).as_str()[1..]).unwrap())
} else if let Some(hdr) = req.headers().get(header::ACCESS_CONTROL_REQUEST_HEADERS) {
Some(hdr.clone())
} else {
None
};
Ok(Started::Response(
HTTPOk.build()
.if_some(self.max_age.as_ref(), |max_age, resp| {
let _ = resp.header(
header::ACCESS_CONTROL_MAX_AGE, format!("{}", max_age).as_str());})
.if_some(self.headers.as_ref(), |headers, resp| {
let _ = resp.header(
header::ACCESS_CONTROL_ALLOW_HEADERS,
&headers.iter().fold(
String::new(), |s, v| s + "," + v.as_str()).as_str()[1..]);})
.if_some(headers, |headers, resp| {
let _ = resp.header(header::ACCESS_CONTROL_ALLOW_HEADERS, headers); })
.if_true(self.origins.is_all(), |resp| {
if self.send_wildcard {
resp.header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*");

View File

@ -217,7 +217,7 @@ pub struct CookieSession {
inner: Rc<CookieSessionInner>,
}
/// Errors that can occure during handling cookie session
/// Errors that can occur during handling cookie session
#[derive(Fail, Debug)]
pub enum CookieSessionError {
/// Size of the serialized session is greater than 4000 bytes.

View File

@ -6,7 +6,7 @@ use std::slice::Iter;
use std::borrow::Cow;
use smallvec::SmallVec;
use error::{ResponseError, UriSegmentError, ErrorBadRequest};
use error::{ResponseError, UriSegmentError, InternalError, ErrorBadRequest};
/// A trait to abstract the idea of creating a new instance of a type from a path parameter.
@ -77,7 +77,7 @@ impl<'a> Params<'a> {
}
}
/// Return iterator to items in paramter container
/// Return iterator to items in parameter container
pub fn iter(&self) -> Iter<(Cow<'a, str>, Cow<'a, str>)> {
self.0.iter()
}
@ -141,7 +141,7 @@ impl FromParam for PathBuf {
macro_rules! FROM_STR {
($type:ty) => {
impl FromParam for $type {
type Err = ErrorBadRequest<<$type as FromStr>::Err>;
type Err = InternalError<<$type as FromStr>::Err>;
fn from_param(val: &str) -> Result<Self, Self::Err> {
<$type as FromStr>::from_str(val).map_err(ErrorBadRequest)

View File

@ -420,7 +420,7 @@ impl Inner {
}
pub fn readall(&mut self) -> Option<Bytes> {
let len = self.items.iter().fold(0, |cur, item| cur + item.len());
let len = self.items.iter().map(|b| b.len()).sum();
if len > 0 {
let mut buf = BytesMut::with_capacity(len);
for item in &self.items {

View File

@ -3,6 +3,7 @@ use std::rc::Rc;
use std::cell::RefCell;
use std::marker::PhantomData;
use log::Level::Debug;
use futures::{Async, Poll, Future, Stream};
use futures::unsync::oneshot;
@ -56,7 +57,7 @@ impl<S: 'static, H: PipelineHandler<S>> PipelineState<S, H> {
struct PipelineInfo<S> {
req: HttpRequest<S>,
count: usize,
count: u16,
mws: Rc<Vec<Box<Middleware<S>>>>,
context: Option<Box<ActorHttpContext>>,
error: Option<Error>,
@ -210,14 +211,14 @@ impl<S: 'static, H: PipelineHandler<S>> StartMiddlewares<S, H> {
fn init(info: &mut PipelineInfo<S>, handler: Rc<RefCell<H>>) -> PipelineState<S, H> {
// execute middlewares, we need this stage because middlewares could be non-async
// and we can move to next state immidietly
let len = info.mws.len();
// and we can move to next state immediately
let len = info.mws.len() as u16;
loop {
if info.count == len {
let reply = handler.borrow_mut().handle(info.req.clone());
return WaitingResponse::init(info, reply)
} else {
match info.mws[info.count].start(&mut info.req) {
match info.mws[info.count as usize].start(&mut info.req) {
Ok(Started::Done) =>
info.count += 1,
Ok(Started::Response(resp)) =>
@ -246,7 +247,7 @@ impl<S: 'static, H: PipelineHandler<S>> StartMiddlewares<S, H> {
}
fn poll(&mut self, info: &mut PipelineInfo<S>) -> Option<PipelineState<S, H>> {
let len = info.mws.len();
let len = info.mws.len() as u16;
'outer: loop {
match self.fut.as_mut().unwrap().poll() {
Ok(Async::NotReady) => return None,
@ -260,7 +261,7 @@ impl<S: 'static, H: PipelineHandler<S>> StartMiddlewares<S, H> {
return Some(WaitingResponse::init(info, reply));
} else {
loop {
match info.mws[info.count].start(info.req_mut()) {
match info.mws[info.count as usize].start(info.req_mut()) {
Ok(Started::Done) =>
info.count += 1,
Ok(Started::Response(resp)) => {
@ -334,7 +335,7 @@ impl<S: 'static, H> RunMiddlewares<S, H> {
loop {
resp = match info.mws[curr].response(info.req_mut(), resp) {
Err(err) => {
info.count = curr + 1;
info.count = (curr + 1) as u16;
return ProcessResponse::init(err.into())
}
Ok(Response::Done(r)) => {
@ -458,6 +459,13 @@ impl<S: 'static, H> ProcessResponse<S, H> {
}
};
if let Some(err) = self.resp.error() {
warn!("Error occured during request handling: {}", err);
if log_enabled!(Debug) {
debug!("{:?}", err);
}
}
match self.resp.replace_body(Body::Empty) {
Body::Streaming(stream) =>
self.iostate = IOState::Payload(stream),
@ -480,7 +488,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
},
Ok(Async::Ready(Some(chunk))) => {
self.iostate = IOState::Payload(body);
match io.write(chunk.as_ref()) {
match io.write(chunk.into()) {
Err(err) => {
info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp))
@ -522,7 +530,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
break 'outter
},
Frame::Chunk(Some(chunk)) => {
match io.write(chunk.as_ref()) {
match io.write(chunk) {
Err(err) => {
info.error = Some(err.into());
return Ok(
@ -575,16 +583,6 @@ impl<S: 'static, H> ProcessResponse<S, H> {
if self.running == RunningState::Paused || self.drain.is_some() {
match io.poll_completed(false) {
Ok(Async::Ready(_)) => {
match io.flush() {
Ok(Async::Ready(_)) => (),
Ok(Async::NotReady) => return Err(PipelineState::Response(self)),
Err(err) => {
debug!("Error sending data: {}", err);
info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp))
}
}
self.running.resume();
// resolve drain futures
@ -596,7 +594,6 @@ impl<S: 'static, H> ProcessResponse<S, H> {
},
Ok(Async::NotReady) => return Err(PipelineState::Response(self)),
Err(err) => {
debug!("Error sending data: {}", err);
info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp))
}
@ -609,7 +606,6 @@ impl<S: 'static, H> ProcessResponse<S, H> {
match io.write_eof() {
Ok(_) => (),
Err(err) => {
debug!("Error sending data: {}", err);
info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp))
}
@ -671,7 +667,7 @@ impl<S: 'static, H> FinishingMiddlewares<S, H> {
self.fut = None;
info.count -= 1;
match info.mws[info.count].finish(info.req_mut(), &self.resp) {
match info.mws[info.count as usize].finish(info.req_mut(), &self.resp) {
Finished::Done => {
if info.count == 0 {
return Some(Completed::init(info))
@ -692,6 +688,10 @@ impl<S, H> Completed<S, H> {
#[inline]
fn init(info: &mut PipelineInfo<S>) -> PipelineState<S, H> {
if let Some(ref err) = info.error {
error!("Error occured during request handling: {}", err);
}
if info.context.is_none() {
PipelineState::None
} else {

View File

@ -19,7 +19,7 @@ use httpresponse::HttpResponse;
/// Route uses builder-like pattern for configuration.
/// During request handling, resource object iterate through all routes
/// and check all predicates for specific route, if request matches all predicates route
/// route considired matched and route handler get called.
/// route considered matched and route handler get called.
///
/// ```rust
/// # extern crate actix_web;

View File

@ -3,7 +3,7 @@ use std::io::{Read, Write};
use std::fmt::Write as FmtWrite;
use std::str::FromStr;
use http::Version;
use http::{Version, Method, HttpTryFrom};
use http::header::{HeaderMap, HeaderValue,
ACCEPT_ENCODING, CONNECTION,
CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING};
@ -16,11 +16,13 @@ use bytes::{Bytes, BytesMut, BufMut, Writer};
use headers::ContentEncoding;
use body::{Body, Binary};
use error::PayloadError;
use helpers::SharedBytes;
use httprequest::HttpMessage;
use httpresponse::HttpResponse;
use payload::{PayloadSender, PayloadWriter};
use super::shared::SharedBytes;
impl ContentEncoding {
#[inline]
@ -344,15 +346,17 @@ impl PayloadEncoder {
pub fn new(buf: SharedBytes, req: &HttpMessage, resp: &mut HttpResponse) -> PayloadEncoder {
let version = resp.version().unwrap_or_else(|| req.version);
let mut body = resp.replace_body(Body::Empty);
let response_encoding = resp.content_encoding();
let has_body = match body {
Body::Empty => false,
Body::Binary(ref bin) => bin.len() >= 512,
Body::Binary(ref bin) =>
!(response_encoding == ContentEncoding::Auto && bin.len() < 96),
_ => true,
};
// Enable content encoding only if response does not contain Content-Encoding header
let mut encoding = if has_body {
let encoding = match *resp.content_encoding() {
let encoding = match response_encoding {
ContentEncoding::Auto => {
// negotiate content-encoding
if let Some(val) = req.headers.get(ACCEPT_ENCODING) {
@ -376,10 +380,12 @@ impl PayloadEncoder {
ContentEncoding::Identity
};
let transfer = match body {
let mut transfer = match body {
Body::Empty => {
resp.headers_mut().remove(CONTENT_LENGTH);
TransferEncoding::eof(buf)
if req.method != Method::HEAD {
resp.headers_mut().remove(CONTENT_LENGTH);
}
TransferEncoding::length(0, buf)
},
Body::Binary(ref mut bytes) => {
if encoding.is_compression() {
@ -396,13 +402,20 @@ impl PayloadEncoder {
ContentEncoding::Auto => unreachable!()
};
// TODO return error!
let _ = enc.write(bytes.as_ref());
let _ = enc.write(bytes.clone());
let _ = enc.write_eof();
*bytes = Binary::from(tmp.get_mut().take());
*bytes = Binary::from(tmp.take());
encoding = ContentEncoding::Identity;
}
resp.headers_mut().remove(CONTENT_LENGTH);
if req.method == Method::HEAD {
let mut b = BytesMut::new();
let _ = write!(b, "{}", bytes.len());
resp.headers_mut().insert(
CONTENT_LENGTH, HeaderValue::try_from(b.freeze()).unwrap());
} else {
resp.headers_mut().remove(CONTENT_LENGTH);
}
TransferEncoding::eof(buf)
}
Body::Streaming(_) | Body::Actor(_) => {
@ -423,7 +436,12 @@ impl PayloadEncoder {
}
}
};
resp.replace_body(body);
//
if req.method == Method::HEAD {
transfer.kind = TransferEncodingKind::Length(0);
} else {
resp.replace_body(body);
}
PayloadEncoder(
match encoding {
@ -503,16 +521,6 @@ impl PayloadEncoder {
impl PayloadEncoder {
#[inline]
pub fn len(&self) -> usize {
self.0.get_ref().len()
}
#[inline]
pub fn get_mut(&mut self) -> &mut BytesMut {
self.0.get_mut()
}
#[inline]
pub fn is_eof(&self) -> bool {
self.0.is_eof()
@ -520,7 +528,7 @@ impl PayloadEncoder {
#[cfg_attr(feature = "cargo-clippy", allow(inline_always))]
#[inline(always)]
pub fn write(&mut self, payload: &[u8]) -> Result<(), io::Error> {
pub fn write(&mut self, payload: Binary) -> Result<(), io::Error> {
self.0.write(payload)
}
@ -543,42 +551,10 @@ impl ContentEncoder {
#[inline]
pub fn is_eof(&self) -> bool {
match *self {
ContentEncoder::Br(ref encoder) =>
encoder.get_ref().is_eof(),
ContentEncoder::Deflate(ref encoder) =>
encoder.get_ref().is_eof(),
ContentEncoder::Gzip(ref encoder) =>
encoder.get_ref().is_eof(),
ContentEncoder::Identity(ref encoder) =>
encoder.is_eof(),
}
}
#[inline]
pub fn get_ref(&self) -> &BytesMut {
match *self {
ContentEncoder::Br(ref encoder) =>
encoder.get_ref().buffer.get_ref(),
ContentEncoder::Deflate(ref encoder) =>
encoder.get_ref().buffer.get_ref(),
ContentEncoder::Gzip(ref encoder) =>
encoder.get_ref().buffer.get_ref(),
ContentEncoder::Identity(ref encoder) =>
encoder.buffer.get_ref(),
}
}
#[inline]
pub fn get_mut(&mut self) -> &mut BytesMut {
match *self {
ContentEncoder::Br(ref mut encoder) =>
encoder.get_mut().buffer.get_mut(),
ContentEncoder::Deflate(ref mut encoder) =>
encoder.get_mut().buffer.get_mut(),
ContentEncoder::Gzip(ref mut encoder) =>
encoder.get_mut().buffer.get_mut(),
ContentEncoder::Identity(ref mut encoder) =>
encoder.buffer.get_mut(),
ContentEncoder::Br(ref encoder) => encoder.get_ref().is_eof(),
ContentEncoder::Deflate(ref encoder) => encoder.get_ref().is_eof(),
ContentEncoder::Gzip(ref encoder) => encoder.get_ref().is_eof(),
ContentEncoder::Identity(ref encoder) => encoder.is_eof(),
}
}
@ -629,10 +605,10 @@ impl ContentEncoder {
#[cfg_attr(feature = "cargo-clippy", allow(inline_always))]
#[inline(always)]
pub fn write(&mut self, data: &[u8]) -> Result<(), io::Error> {
pub fn write(&mut self, data: Binary) -> Result<(), io::Error> {
match *self {
ContentEncoder::Br(ref mut encoder) => {
match encoder.write(data) {
match encoder.write(data.as_ref()) {
Ok(_) =>
encoder.flush(),
Err(err) => {
@ -642,7 +618,7 @@ impl ContentEncoder {
}
},
ContentEncoder::Gzip(ref mut encoder) => {
match encoder.write(data) {
match encoder.write(data.as_ref()) {
Ok(_) =>
encoder.flush(),
Err(err) => {
@ -652,7 +628,7 @@ impl ContentEncoder {
}
}
ContentEncoder::Deflate(ref mut encoder) => {
match encoder.write(data) {
match encoder.write(data.as_ref()) {
Ok(_) =>
encoder.flush(),
Err(err) => {
@ -686,7 +662,7 @@ enum TransferEncodingKind {
Length(u64),
/// An Encoder for when Content-Length is not known.
///
/// Appliction decides when to stop writing.
/// Application decides when to stop writing.
Eof,
}
@ -727,11 +703,12 @@ impl TransferEncoding {
/// Encode message. Return `EOF` state of encoder
#[inline]
pub fn encode(&mut self, msg: &[u8]) -> io::Result<bool> {
pub fn encode(&mut self, mut msg: Binary) -> io::Result<bool> {
match self.kind {
TransferEncodingKind::Eof => {
self.buffer.get_mut().extend_from_slice(msg);
Ok(msg.is_empty())
let eof = msg.is_empty();
self.buffer.extend(msg);
Ok(eof)
},
TransferEncodingKind::Chunked(ref mut eof) => {
if *eof {
@ -740,24 +717,31 @@ impl TransferEncoding {
if msg.is_empty() {
*eof = true;
self.buffer.get_mut().extend_from_slice(b"0\r\n\r\n");
self.buffer.extend_from_slice(b"0\r\n\r\n");
} else {
write!(self.buffer.get_mut(), "{:X}\r\n", msg.len())
let mut buf = BytesMut::new();
write!(&mut buf, "{:X}\r\n", msg.len())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
self.buffer.get_mut().extend_from_slice(msg);
self.buffer.get_mut().extend_from_slice(b"\r\n");
self.buffer.reserve(buf.len() + msg.len() + 2);
self.buffer.extend(buf.into());
self.buffer.extend(msg);
self.buffer.extend_from_slice(b"\r\n");
}
Ok(*eof)
},
TransferEncodingKind::Length(ref mut remaining) => {
if msg.is_empty() {
return Ok(*remaining == 0)
}
let max = cmp::min(*remaining, msg.len() as u64);
self.buffer.get_mut().extend_from_slice(msg[..max as usize].as_ref());
if *remaining > 0 {
if msg.is_empty() {
return Ok(*remaining == 0)
}
let len = cmp::min(*remaining, msg.len() as u64);
self.buffer.extend(msg.take().split_to(len as usize).into());
*remaining -= max as u64;
Ok(*remaining == 0)
*remaining -= len as u64;
Ok(*remaining == 0)
} else {
Ok(true)
}
},
}
}
@ -770,7 +754,7 @@ impl TransferEncoding {
TransferEncodingKind::Chunked(ref mut eof) => {
if !*eof {
*eof = true;
self.buffer.get_mut().extend_from_slice(b"0\r\n\r\n");
self.buffer.extend_from_slice(b"0\r\n\r\n");
}
},
}
@ -781,7 +765,7 @@ impl io::Write for TransferEncoding {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.encode(buf)?;
self.encode(Binary::from_slice(buf))?;
Ok(buf.len())
}
@ -867,8 +851,8 @@ mod tests {
fn test_chunked_te() {
let bytes = SharedBytes::default();
let mut enc = TransferEncoding::chunked(bytes.clone());
assert!(!enc.encode(b"test").ok().unwrap());
assert!(enc.encode(b"").ok().unwrap());
assert!(!enc.encode(Binary::from(b"test".as_ref())).ok().unwrap());
assert!(enc.encode(Binary::from(b"".as_ref())).ok().unwrap());
assert_eq!(bytes.get_mut().take().freeze(),
Bytes::from_static(b"4\r\ntest\r\n0\r\n\r\n"));
}

View File

@ -96,12 +96,12 @@ impl<T, H> Http1<T, H>
}
}
// TODO: refacrtor
// TODO: refactor
#[cfg_attr(feature = "cargo-clippy", allow(cyclomatic_complexity))]
pub fn poll(&mut self) -> Poll<(), ()> {
// keep-alive timer
if self.keepalive_timer.is_some() {
match self.keepalive_timer.as_mut().unwrap().poll() {
if let Some(ref mut timer) = self.keepalive_timer {
match timer.poll() {
Ok(Async::Ready(_)) => {
trace!("Keep-alive timeout, close connection");
return Ok(Async::Ready(()))
@ -133,7 +133,7 @@ impl<T, H> Http1<T, H>
Ok(Async::Ready(ready)) => {
not_ready = false;
// overide keep-alive state
// override keep-alive state
if self.stream.keepalive() {
self.flags.insert(Flags::KEEPALIVE);
} else {
@ -146,10 +146,8 @@ impl<T, H> Http1<T, H>
item.flags.insert(EntryFlags::FINISHED);
}
},
Ok(Async::NotReady) => {
// no more IO for this iteration
io = true;
},
// no more IO for this iteration
Ok(Async::NotReady) => io = true,
Err(err) => {
// it is not possible to recover from error
// during pipe handling, so just drop connection
@ -227,38 +225,7 @@ impl<T, H> Http1<T, H>
self.tasks.push_back(
Entry {pipe: pipe.unwrap_or_else(|| Pipeline::error(HTTPNotFound)),
flags: EntryFlags::empty()});
}
Err(ReaderError::Disconnect) => {
not_ready = false;
self.flags.insert(Flags::ERROR);
self.stream.disconnected();
for entry in &mut self.tasks {
entry.pipe.disconnected()
}
},
Err(err) => {
// notify all tasks
not_ready = false;
self.stream.disconnected();
for entry in &mut self.tasks {
entry.pipe.disconnected()
}
// kill keepalive
self.flags.remove(Flags::KEEPALIVE);
self.keepalive_timer.take();
// on parse error, stop reading stream but tasks need to be completed
self.flags.insert(Flags::ERROR);
if self.tasks.is_empty() {
if let ReaderError::Error(err) = err {
self.tasks.push_back(
Entry {pipe: Pipeline::error(err.error_response()),
flags: EntryFlags::empty()});
}
}
}
Ok(Async::NotReady) => {
// start keep-alive timer, this also is slow request timeout
if self.tasks.is_empty() {
@ -293,7 +260,38 @@ impl<T, H> Http1<T, H>
}
}
break
}
},
Err(ReaderError::Disconnect) => {
not_ready = false;
self.flags.insert(Flags::ERROR);
self.stream.disconnected();
for entry in &mut self.tasks {
entry.pipe.disconnected()
}
},
Err(err) => {
// notify all tasks
not_ready = false;
self.stream.disconnected();
for entry in &mut self.tasks {
entry.pipe.disconnected()
}
// kill keepalive
self.flags.remove(Flags::KEEPALIVE);
self.keepalive_timer.take();
// on parse error, stop reading stream but tasks need to be completed
self.flags.insert(Flags::ERROR);
if self.tasks.is_empty() {
if let ReaderError::Error(err) = err {
self.tasks.push_back(
Entry {pipe: Pipeline::error(err.error_response()),
flags: EntryFlags::empty()});
}
}
},
}
}
@ -1204,6 +1202,7 @@ mod tests {
panic!("Error");
}
// type in chunked
let mut buf = Buffer::new(
"GET /test HTTP/1.1\r\n\
transfer-encoding: chnked\r\n\r\n");

View File

@ -2,15 +2,15 @@ use std::io;
use bytes::BufMut;
use futures::{Async, Poll};
use tokio_io::AsyncWrite;
use http::Version;
use http::{Method, Version};
use http::header::{HeaderValue, CONNECTION, DATE};
use helpers;
use body::Body;
use helpers::SharedBytes;
use body::{Body, Binary};
use httprequest::HttpMessage;
use httpresponse::HttpResponse;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
use super::shared::SharedBytes;
use super::encoding::PayloadEncoder;
const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific
@ -56,23 +56,25 @@ impl<T: AsyncWrite> H1Writer<T> {
}
pub fn disconnected(&mut self) {
self.encoder.get_mut().take();
self.buffer.take();
}
pub fn keepalive(&self) -> bool {
self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE)
}
fn write_to_stream(&mut self) -> Result<WriterState, io::Error> {
let buffer = self.encoder.get_mut();
while !buffer.is_empty() {
match self.stream.write(buffer.as_ref()) {
fn write_to_stream(&mut self) -> io::Result<WriterState> {
while !self.buffer.is_empty() {
match self.stream.write(self.buffer.as_ref()) {
Ok(0) => {
self.disconnected();
return Ok(WriterState::Done);
},
Ok(n) => {
let _ = buffer.split_to(n);
let _ = self.buffer.split_to(n);
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if buffer.len() > MAX_WRITE_BUFFER_SIZE {
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
return Ok(WriterState::Pause)
} else {
return Ok(WriterState::Done)
@ -92,23 +94,7 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
self.written
}
#[inline]
fn flush(&mut self) -> Poll<(), io::Error> {
match self.stream.flush() {
Ok(_) => Ok(Async::Ready(())),
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
Ok(Async::NotReady)
} else {
Err(e)
}
}
}
}
fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse)
-> Result<WriterState, io::Error>
{
fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse) -> io::Result<WriterState> {
// prepare task
self.flags.insert(Flags::STARTED);
self.encoder = PayloadEncoder::new(self.buffer.clone(), req, msg);
@ -133,7 +119,7 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
// render message
{
let mut buffer = self.encoder.get_mut();
let mut buffer = self.buffer.get_mut();
if let Body::Binary(ref bytes) = body {
buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len());
} else {
@ -146,7 +132,11 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
match body {
Body::Empty =>
buffer.extend_from_slice(b"\r\ncontent-length: 0\r\n"),
if req.method != Method::HEAD {
buffer.extend_from_slice(b"\r\ncontent-length: 0\r\n");
} else {
buffer.extend_from_slice(b"\r\n");
},
Body::Binary(ref bytes) =>
helpers::write_content_length(bytes.len(), &mut buffer),
_ =>
@ -176,40 +166,39 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
if let Body::Binary(bytes) = body {
self.written = bytes.len() as u64;
self.encoder.write(bytes.as_ref())?;
self.encoder.write(bytes)?;
} else {
msg.replace_body(body);
}
Ok(WriterState::Done)
}
fn write(&mut self, payload: &[u8]) -> Result<WriterState, io::Error> {
fn write(&mut self, payload: Binary) -> io::Result<WriterState> {
self.written += payload.len() as u64;
if !self.flags.contains(Flags::DISCONNECTED) {
if self.flags.contains(Flags::STARTED) {
// TODO: add warning, write after EOF
self.encoder.write(payload)?;
return Ok(WriterState::Done)
} else {
// might be response to EXCEPT
self.encoder.get_mut().extend_from_slice(payload)
self.buffer.extend_from_slice(payload.as_ref())
}
}
if self.encoder.len() > MAX_WRITE_BUFFER_SIZE {
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
Ok(WriterState::Pause)
} else {
Ok(WriterState::Done)
}
}
fn write_eof(&mut self) -> Result<WriterState, io::Error> {
fn write_eof(&mut self) -> io::Result<WriterState> {
self.encoder.write_eof()?;
if !self.encoder.is_eof() {
Err(io::Error::new(io::ErrorKind::Other,
"Last payload item, but eof is not reached"))
} else if self.encoder.len() > MAX_WRITE_BUFFER_SIZE {
} else if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
Ok(WriterState::Pause)
} else {
Ok(WriterState::Done)

View File

@ -7,11 +7,11 @@ use http::{Version, HttpTryFrom, Response};
use http::header::{HeaderValue, CONNECTION, TRANSFER_ENCODING, DATE, CONTENT_LENGTH};
use helpers;
use body::Body;
use helpers::SharedBytes;
use body::{Body, Binary};
use httprequest::HttpMessage;
use httpresponse::HttpResponse;
use super::encoding::PayloadEncoder;
use super::shared::SharedBytes;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
const CHUNK_SIZE: usize = 16_384;
@ -52,15 +52,13 @@ impl H2Writer {
}
}
fn write_to_stream(&mut self) -> Result<WriterState, io::Error> {
fn write_to_stream(&mut self) -> io::Result<WriterState> {
if !self.flags.contains(Flags::STARTED) {
return Ok(WriterState::Done)
}
if let Some(ref mut stream) = self.stream {
let buffer = self.encoder.get_mut();
if buffer.is_empty() {
if self.buffer.is_empty() {
if self.flags.contains(Flags::EOF) {
let _ = stream.send_data(Bytes::new(), true);
}
@ -70,7 +68,7 @@ impl H2Writer {
loop {
match stream.poll_capacity() {
Ok(Async::NotReady) => {
if buffer.len() > MAX_WRITE_BUFFER_SIZE {
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
return Ok(WriterState::Pause)
} else {
return Ok(WriterState::Done)
@ -80,15 +78,15 @@ impl H2Writer {
return Ok(WriterState::Done)
}
Ok(Async::Ready(Some(cap))) => {
let len = buffer.len();
let bytes = buffer.split_to(cmp::min(cap, len));
let eof = buffer.is_empty() && self.flags.contains(Flags::EOF);
let len = self.buffer.len();
let bytes = self.buffer.split_to(cmp::min(cap, len));
let eof = self.buffer.is_empty() && self.flags.contains(Flags::EOF);
self.written += bytes.len() as u64;
if let Err(err) = stream.send_data(bytes.freeze(), eof) {
return Err(io::Error::new(io::ErrorKind::Other, err))
} else if !buffer.is_empty() {
let cap = cmp::min(buffer.len(), CHUNK_SIZE);
} else if !self.buffer.is_empty() {
let cap = cmp::min(self.buffer.len(), CHUNK_SIZE);
stream.reserve_capacity(cap);
} else {
return Ok(WriterState::Pause)
@ -110,16 +108,7 @@ impl Writer for H2Writer {
self.written
}
#[inline]
fn flush(&mut self) -> Poll<(), io::Error> {
Ok(Async::Ready(()))
}
fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse)
-> Result<WriterState, io::Error>
{
// trace!("Prepare response with status: {:?}", msg.status());
fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse) -> io::Result<WriterState> {
// prepare response
self.flags.insert(Flags::STARTED);
self.encoder = PayloadEncoder::new(self.buffer.clone(), req, msg);
@ -172,9 +161,9 @@ impl Writer for H2Writer {
if let Body::Binary(bytes) = body {
self.flags.insert(Flags::EOF);
self.written = bytes.len() as u64;
self.encoder.write(bytes.as_ref())?;
self.encoder.write(bytes)?;
if let Some(ref mut stream) = self.stream {
stream.reserve_capacity(cmp::min(self.encoder.len(), CHUNK_SIZE));
stream.reserve_capacity(cmp::min(self.buffer.len(), CHUNK_SIZE));
}
Ok(WriterState::Pause)
} else {
@ -183,7 +172,7 @@ impl Writer for H2Writer {
}
}
fn write(&mut self, payload: &[u8]) -> Result<WriterState, io::Error> {
fn write(&mut self, payload: Binary) -> io::Result<WriterState> {
self.written = payload.len() as u64;
if !self.flags.contains(Flags::DISCONNECTED) {
@ -192,25 +181,25 @@ impl Writer for H2Writer {
self.encoder.write(payload)?;
} else {
// might be response for EXCEPT
self.encoder.get_mut().extend_from_slice(payload)
self.buffer.extend_from_slice(payload.as_ref())
}
}
if self.encoder.len() > MAX_WRITE_BUFFER_SIZE {
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
Ok(WriterState::Pause)
} else {
Ok(WriterState::Done)
}
}
fn write_eof(&mut self) -> Result<WriterState, io::Error> {
fn write_eof(&mut self) -> io::Result<WriterState> {
self.encoder.write_eof()?;
self.flags.insert(Flags::EOF);
if !self.encoder.is_eof() {
Err(io::Error::new(io::ErrorKind::Other,
"Last payload item, but eof is not reached"))
} else if self.encoder.len() > MAX_WRITE_BUFFER_SIZE {
} else if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
Ok(WriterState::Pause)
} else {
Ok(WriterState::Done)

View File

@ -15,11 +15,13 @@ mod h2;
mod h1writer;
mod h2writer;
mod settings;
mod shared;
mod utils;
pub use self::srv::HttpServer;
pub use self::settings::ServerSettings;
use body::Binary;
use error::Error;
use httprequest::{HttpMessage, HttpRequest};
use httpresponse::HttpResponse;
@ -54,6 +56,12 @@ pub trait HttpHandler: 'static {
fn handle(&mut self, req: HttpRequest) -> Result<Box<HttpHandlerTask>, HttpRequest>;
}
impl HttpHandler for Box<HttpHandler> {
fn handle(&mut self, req: HttpRequest) -> Result<Box<HttpHandlerTask>, HttpRequest> {
self.as_mut().handle(req)
}
}
pub trait HttpHandlerTask {
fn poll(&mut self) -> Poll<(), Error>;
@ -90,14 +98,11 @@ pub enum WriterState {
pub trait Writer {
fn written(&self) -> u64;
fn start(&mut self, req: &mut HttpMessage, resp: &mut HttpResponse)
-> Result<WriterState, io::Error>;
fn start(&mut self, req: &mut HttpMessage, resp: &mut HttpResponse) -> io::Result<WriterState>;
fn write(&mut self, payload: &[u8]) -> Result<WriterState, io::Error>;
fn write(&mut self, payload: Binary) -> io::Result<WriterState>;
fn write_eof(&mut self) -> Result<WriterState, io::Error>;
fn flush(&mut self) -> Poll<(), io::Error>;
fn write_eof(&mut self) -> io::Result<WriterState>;
fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error>;
}

View File

@ -4,6 +4,7 @@ use std::cell::{Cell, RefCell, RefMut};
use helpers;
use super::channel::Node;
use super::shared::{SharedBytes, SharedBytesPool};
/// Various server settings
#[derive(Debug, Clone)]
@ -63,7 +64,7 @@ pub(crate) struct WorkerSettings<H> {
h: RefCell<Vec<H>>,
enabled: bool,
keep_alive: u64,
bytes: Rc<helpers::SharedBytesPool>,
bytes: Rc<SharedBytesPool>,
messages: Rc<helpers::SharedMessagePool>,
channels: Cell<usize>,
node: Node<()>,
@ -75,7 +76,7 @@ impl<H> WorkerSettings<H> {
h: RefCell::new(h),
enabled: if let Some(ka) = keep_alive { ka > 0 } else { false },
keep_alive: keep_alive.unwrap_or(0),
bytes: Rc::new(helpers::SharedBytesPool::new()),
bytes: Rc::new(SharedBytesPool::new()),
messages: Rc::new(helpers::SharedMessagePool::new()),
channels: Cell::new(0),
node: Node::head(),
@ -102,8 +103,8 @@ impl<H> WorkerSettings<H> {
self.enabled
}
pub fn get_shared_bytes(&self) -> helpers::SharedBytes {
helpers::SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes))
pub fn get_shared_bytes(&self) -> SharedBytes {
SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes))
}
pub fn get_http_message(&self) -> helpers::SharedHttpMessage {

120
src/server/shared.rs Normal file
View File

@ -0,0 +1,120 @@
use std::mem;
use std::cell::RefCell;
use std::rc::Rc;
use std::collections::VecDeque;
use bytes::BytesMut;
use body::Binary;
/// Internal use only! unsafe
#[derive(Debug)]
pub(crate) struct SharedBytesPool(RefCell<VecDeque<Rc<BytesMut>>>);
impl SharedBytesPool {
pub fn new() -> SharedBytesPool {
SharedBytesPool(RefCell::new(VecDeque::with_capacity(128)))
}
pub fn get_bytes(&self) -> Rc<BytesMut> {
if let Some(bytes) = self.0.borrow_mut().pop_front() {
bytes
} else {
Rc::new(BytesMut::new())
}
}
pub fn release_bytes(&self, mut bytes: Rc<BytesMut>) {
let v = &mut self.0.borrow_mut();
if v.len() < 128 {
Rc::get_mut(&mut bytes).unwrap().take();
v.push_front(bytes);
}
}
}
#[derive(Debug)]
pub(crate) struct SharedBytes(
Option<Rc<BytesMut>>, Option<Rc<SharedBytesPool>>);
impl Drop for SharedBytes {
fn drop(&mut self) {
if let Some(ref pool) = self.1 {
if let Some(bytes) = self.0.take() {
if Rc::strong_count(&bytes) == 1 {
pool.release_bytes(bytes);
}
}
}
}
}
impl SharedBytes {
pub fn empty() -> Self {
SharedBytes(None, None)
}
pub fn new(bytes: Rc<BytesMut>, pool: Rc<SharedBytesPool>) -> SharedBytes {
SharedBytes(Some(bytes), Some(pool))
}
#[inline(always)]
#[allow(mutable_transmutes)]
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))]
pub fn get_mut(&self) -> &mut BytesMut {
let r: &BytesMut = self.0.as_ref().unwrap().as_ref();
unsafe{mem::transmute(r)}
}
#[inline]
pub fn len(&self) -> usize {
self.0.as_ref().unwrap().len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.0.as_ref().unwrap().is_empty()
}
#[inline]
pub fn as_ref(&self) -> &[u8] {
self.0.as_ref().unwrap().as_ref()
}
pub fn split_to(&self, n: usize) -> BytesMut {
self.get_mut().split_to(n)
}
pub fn take(&self) -> BytesMut {
self.get_mut().take()
}
#[inline]
pub fn reserve(&self, cnt: usize) {
self.get_mut().reserve(cnt)
}
#[inline]
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
pub fn extend(&self, data: Binary) {
self.get_mut().extend_from_slice(data.as_ref());
}
#[inline]
pub fn extend_from_slice(&self, data: &[u8]) {
self.get_mut().extend_from_slice(data);
}
}
impl Default for SharedBytes {
fn default() -> Self {
SharedBytes(Some(Rc::new(BytesMut::new())), None)
}
}
impl Clone for SharedBytes {
fn clone(&self) -> SharedBytes {
SharedBytes(self.0.clone(), self.1.clone())
}
}

View File

@ -21,9 +21,7 @@ use native_tls::TlsAcceptor;
use tokio_tls::TlsStream;
#[cfg(feature="alpn")]
use openssl::ssl::{SslMethod, SslAcceptorBuilder};
#[cfg(feature="alpn")]
use openssl::pkcs12::ParsedPkcs12;
use openssl::ssl::{AlpnError, SslAcceptorBuilder};
#[cfg(feature="alpn")]
use tokio_openssl::SslStream;
@ -268,9 +266,9 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
where U: IntoIterator<Item=V> + 'static,
V: IntoHttpHandler<Handler=H>,
{
/// Start listening for incomming connections.
/// Start listening for incoming connections.
///
/// This method starts number of http handler workers in seperate threads.
/// This method starts number of http handler workers in separate threads.
/// For each address this method starts separate thread which does `accept()` in a loop.
///
/// This methods panics if no socket addresses get bound.
@ -298,7 +296,7 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
pub fn start(mut self) -> SyncAddress<Self>
{
if self.sockets.is_empty() {
panic!("HttpServer::bind() has to be called befor start()");
panic!("HttpServer::bind() has to be called before start()");
} else {
let addrs: Vec<(net::SocketAddr, net::TcpListener)> =
self.sockets.drain().collect();
@ -320,7 +318,7 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
}
}
/// Spawn new thread and start listening for incomming connections.
/// Spawn new thread and start listening for incoming connections.
///
/// This method spawns new thread and starts new actix system. Other than that it is
/// similar to `start()` method. This method blocks.
@ -359,7 +357,7 @@ impl<H: HttpHandler, U, V> HttpServer<TlsStream<TcpStream>, net::SocketAddr, H,
where U: IntoIterator<Item=V> + 'static,
V: IntoHttpHandler<Handler=H>,
{
/// Start listening for incomming tls connections.
/// Start listening for incoming tls connections.
pub fn start_tls(mut self, pkcs12: ::Pkcs12) -> io::Result<SyncAddress<Self>> {
if self.sockets.is_empty() {
Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound"))
@ -398,26 +396,28 @@ impl<H: HttpHandler, U, V> HttpServer<SslStream<TcpStream>, net::SocketAddr, H,
where U: IntoIterator<Item=V> + 'static,
V: IntoHttpHandler<Handler=H>,
{
/// Start listening for incomming tls connections.
/// Start listening for incoming tls connections.
///
/// This method sets alpn protocols to "h2" and "http/1.1"
pub fn start_ssl(mut self, identity: &ParsedPkcs12) -> io::Result<SyncAddress<Self>> {
pub fn start_ssl(mut self, mut builder: SslAcceptorBuilder) -> io::Result<SyncAddress<Self>>
{
if self.sockets.is_empty() {
Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound"))
} else {
// alpn support
builder.set_alpn_protos(b"\x02h2\x08http/1.1")?;
builder.set_alpn_select_callback(|_, protos| {
const H2: &[u8] = b"\x02h2";
if protos.windows(3).any(|window| window == H2) {
Ok(b"h2")
} else {
Err(AlpnError::NOACK)
}
});
let acceptor = builder.build();
let addrs: Vec<(net::SocketAddr, net::TcpListener)> = self.sockets.drain().collect();
let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false);
let acceptor = match SslAcceptorBuilder::mozilla_intermediate(
SslMethod::tls(), &identity.pkey, &identity.cert, &identity.chain)
{
Ok(mut builder) => {
match builder.set_alpn_protocols(&[b"h2", b"http/1.1"]) {
Ok(_) => builder.build(),
Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)),
}
},
Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err))
};
let workers = self.start_workers(&settings, &StreamHandlerType::Alpn(acceptor));
// start acceptors threads
@ -443,7 +443,7 @@ impl<T, A, H, U, V> HttpServer<WrapperStream<T>, A, H, U>
U: IntoIterator<Item=V> + 'static,
V: IntoHttpHandler<Handler=H>,
{
/// Start listening for incomming connections from a stream.
/// Start listening for incoming connections from a stream.
///
/// This method uses only one thread for handling incoming connections.
pub fn start_incoming<S>(mut self, stream: S, secure: bool) -> SyncAddress<Self>
@ -663,7 +663,7 @@ fn start_accept_thread(sock: net::TcpListener, addr: net::SocketAddr, backlog: i
}
}
// Start listening for incommin commands
// Start listening for incoming commands
if let Err(err) = poll.register(&reg, CMD,
mio::Ready::readable(), mio::PollOpt::edge()) {
panic!("Can not register Registration: {}", err);

View File

@ -29,7 +29,7 @@ use server::{HttpServer, HttpHandler, IntoHttpHandler, ServerSettings};
/// The `TestServer` type.
///
/// `TestServer` is very simple test server that simplify process of writing
/// integrational tests cases for actix web applications.
/// integration tests cases for actix web applications.
///
/// # Examples
///
@ -61,7 +61,7 @@ impl TestServer {
/// Start new test server
///
/// This methos accepts configuration method. You can add
/// This method accepts configuration method. You can add
/// middlewares or set handlers for test application.
pub fn new<F>(config: F) -> Self
where F: Sync + Send + 'static + Fn(&mut TestApp<()>),
@ -101,7 +101,7 @@ impl TestServer {
/// Start new test server with custom application state
///
/// This methos accepts state factory and configuration method.
/// This method accepts state factory and configuration method.
pub fn with_state<S, FS, F>(state: FS, config: F) -> Self
where S: 'static,
FS: Sync + Send + 'static + Fn() -> S,
@ -287,12 +287,12 @@ impl Default for TestRequest<()> {
impl TestRequest<()> {
/// Create TestReqeust and set request uri
/// Create TestRequest and set request uri
pub fn with_uri(path: &str) -> TestRequest<()> {
TestRequest::default().uri(path)
}
/// Create TestReqeust and set header
/// Create TestRequest and set header
pub fn with_header<K, V>(key: K, value: V) -> TestRequest<()>
where HeaderName: HttpTryFrom<K>,
HeaderValue: HttpTryFrom<V>

View File

@ -23,7 +23,7 @@ use ws::proto::{OpCode, CloseCode};
pub struct WebsocketContext<A, S=()> where A: Actor<Context=WebsocketContext<A, S>>,
{
inner: ContextImpl<A>,
stream: Option<SmallVec<[ContextFrame; 2]>>,
stream: Option<SmallVec<[ContextFrame; 4]>>,
request: HttpRequest<S>,
disconnected: bool,
}
@ -55,6 +55,13 @@ impl<A, S> AsyncContext<A> for WebsocketContext<A, S> where A: Actor<Context=Sel
self.inner.wait(fut)
}
#[doc(hidden)]
#[inline]
fn waiting(&self) -> bool {
self.inner.waiting() || self.inner.state() == ActorState::Stopping ||
self.inner.state() == ActorState::Stopped
}
fn cancel_future(&mut self, handle: SpawnHandle) -> bool {
self.inner.cancel_future(handle)
}
@ -219,7 +226,7 @@ impl<A, S> ActorHttpContext for WebsocketContext<A, S> where A: Actor<Context=Se
self.stop();
}
fn poll(&mut self) -> Poll<Option<SmallVec<[ContextFrame;2]>>, Error> {
fn poll(&mut self) -> Poll<Option<SmallVec<[ContextFrame; 4]>>, Error> {
let ctx: &mut WebsocketContext<A, S> = unsafe {
mem::transmute(self as &mut WebsocketContext<A, S>)
};

View File

@ -5,14 +5,7 @@ use bytes::BytesMut;
use body::Binary;
use ws::proto::{OpCode, CloseCode};
fn apply_mask(buf: &mut [u8], mask: &[u8; 4]) {
let iter = buf.iter_mut().zip(mask.iter().cycle());
for (byte, &key) in iter {
*byte ^= key
}
}
use ws::mask::apply_mask;
/// A struct representing a `WebSocket` frame.
#[derive(Debug)]
@ -28,7 +21,7 @@ pub(crate) struct Frame {
impl Frame {
/// Desctructe frame
/// Destruct frame
pub fn unpack(self) -> (bool, OpCode, Binary) {
(self.finished, self.opcode, self.payload)
}

120
src/ws/mask.rs Normal file
View File

@ -0,0 +1,120 @@
//! This is code from [Tungstenite project](https://github.com/snapview/tungstenite-rs)
use std::cmp::min;
use std::mem::uninitialized;
use std::ptr::copy_nonoverlapping;
/// Mask/unmask a frame.
#[inline]
pub fn apply_mask(buf: &mut [u8], mask: &[u8; 4]) {
apply_mask_fast32(buf, mask)
}
/// A safe unoptimized mask application.
#[inline]
#[allow(dead_code)]
fn apply_mask_fallback(buf: &mut [u8], mask: &[u8; 4]) {
for (i, byte) in buf.iter_mut().enumerate() {
*byte ^= mask[i & 3];
}
}
/// Faster version of `apply_mask()` which operates on 4-byte blocks.
#[inline]
#[allow(dead_code)]
fn apply_mask_fast32(buf: &mut [u8], mask: &[u8; 4]) {
// TODO replace this with read_unaligned() as it stabilizes.
let mask_u32 = unsafe {
let mut m: u32 = uninitialized();
#[allow(trivial_casts)]
copy_nonoverlapping(mask.as_ptr(), &mut m as *mut _ as *mut u8, 4);
m
};
let mut ptr = buf.as_mut_ptr();
let mut len = buf.len();
// Possible first unaligned block.
let head = min(len, (4 - (ptr as usize & 3)) & 3);
let mask_u32 = if head > 0 {
unsafe {
xor_mem(ptr, mask_u32, head);
ptr = ptr.offset(head as isize);
}
len -= head;
if cfg!(target_endian = "big") {
mask_u32.rotate_left(8 * head as u32)
} else {
mask_u32.rotate_right(8 * head as u32)
}
} else {
mask_u32
};
if len > 0 {
debug_assert_eq!(ptr as usize % 4, 0);
}
// Properly aligned middle of the data.
while len > 4 {
unsafe {
*(ptr as *mut u32) ^= mask_u32;
ptr = ptr.offset(4);
len -= 4;
}
}
// Possible last block.
if len > 0 {
unsafe { xor_mem(ptr, mask_u32, len); }
}
}
#[inline]
// TODO: copy_nonoverlapping here compiles to call memcpy. While it is not so inefficient,
// it could be done better. The compiler does not see that len is limited to 3.
unsafe fn xor_mem(ptr: *mut u8, mask: u32, len: usize) {
let mut b: u32 = uninitialized();
#[allow(trivial_casts)]
copy_nonoverlapping(ptr, &mut b as *mut _ as *mut u8, len);
b ^= mask;
#[allow(trivial_casts)]
copy_nonoverlapping(&b as *const _ as *const u8, ptr, len);
}
#[cfg(test)]
mod tests {
use super::{apply_mask_fallback, apply_mask_fast32};
#[test]
fn test_apply_mask() {
let mask = [
0x6d, 0xb6, 0xb2, 0x80,
];
let unmasked = vec![
0xf3, 0x00, 0x01, 0x02, 0x03, 0x80, 0x81, 0x82,
0xff, 0xfe, 0x00, 0x17, 0x74, 0xf9, 0x12, 0x03,
];
// Check masking with proper alignment.
{
let mut masked = unmasked.clone();
apply_mask_fallback(&mut masked, &mask);
let mut masked_fast = unmasked.clone();
apply_mask_fast32(&mut masked_fast, &mask);
assert_eq!(masked, masked_fast);
}
// Check masking without alignment.
{
let mut masked = unmasked.clone();
apply_mask_fallback(&mut masked[1..], &mask);
let mut masked_fast = unmasked.clone();
apply_mask_fast32(&mut masked_fast[1..], &mask);
assert_eq!(masked, masked_fast);
}
}
}

View File

@ -58,6 +58,7 @@ use httpresponse::{ConnectionType, HttpResponse, HttpResponseBuilder};
mod frame;
mod proto;
mod context;
mod mask;
use ws::frame::Frame;
use ws::proto::{hash_key, OpCode};

View File

@ -152,6 +152,66 @@ fn test_body_br_streaming() {
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
}
#[test]
fn test_head_empty() {
let srv = test::TestServer::new(
|app| app.handler(|_| {
httpcodes::HTTPOk.build()
.content_length(STR.len() as u64).finish()}));
let client = reqwest::Client::new();
let mut res = client.head(&srv.url("/")).send().unwrap();
assert!(res.status().is_success());
let mut bytes = BytesMut::with_capacity(2048).writer();
let len = res.headers()
.get::<reqwest::header::ContentLength>().map(|ct_len| **ct_len).unwrap();
assert_eq!(len, STR.len() as u64);
let _ = res.copy_to(&mut bytes);
let bytes = bytes.into_inner();
assert!(bytes.is_empty());
}
#[test]
fn test_head_binary() {
let srv = test::TestServer::new(
|app| app.handler(|_| {
httpcodes::HTTPOk.build()
.content_encoding(headers::ContentEncoding::Identity)
.content_length(100).body(STR)}));
let client = reqwest::Client::new();
let mut res = client.head(&srv.url("/")).send().unwrap();
assert!(res.status().is_success());
let mut bytes = BytesMut::with_capacity(2048).writer();
let len = res.headers()
.get::<reqwest::header::ContentLength>().map(|ct_len| **ct_len).unwrap();
assert_eq!(len, STR.len() as u64);
let _ = res.copy_to(&mut bytes);
let bytes = bytes.into_inner();
assert!(bytes.is_empty());
}
#[test]
fn test_head_binary2() {
let srv = test::TestServer::new(
|app| app.handler(|_| {
httpcodes::HTTPOk.build()
.content_encoding(headers::ContentEncoding::Identity)
.body(STR)
}));
let client = reqwest::Client::new();
let mut res = client.head(&srv.url("/")).send().unwrap();
assert!(res.status().is_success());
let mut bytes = BytesMut::with_capacity(2048).writer();
let len = res.headers()
.get::<reqwest::header::ContentLength>().map(|ct_len| **ct_len).unwrap();
assert_eq!(len, STR.len() as u64);
let _ = res.copy_to(&mut bytes);
let bytes = bytes.into_inner();
assert!(bytes.is_empty());
}
#[test]
fn test_body_length() {
let srv = test::TestServer::new(