1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-07-23 17:28:19 +02:00

Compare commits

...

80 Commits

Author SHA1 Message Date
Nikolay Kim
1fe4315c94 use actix 0.5.4 2018-03-16 13:37:47 -07:00
Nikolay Kim
381b90e9a1 bump version 2018-03-16 12:31:29 -07:00
Nikolay Kim
2d18dba40a fix compilation 2018-03-16 12:28:08 -07:00
Nikolay Kim
d2693d58a8 clippy warnings 2018-03-16 12:12:55 -07:00
Nikolay Kim
84bf282c17 add basic client connection pooling 2018-03-16 12:04:01 -07:00
Nikolay Kim
b15b5e5246 check number of available workers 2018-03-16 11:17:27 -07:00
Douman
52b3b0c362 Merge pull request #119 from DoumanAsh/default_static_files
Add default resource for StaticFiles
2018-03-16 20:12:07 +03:00
Nikolay Kim
64c4cefa8f Merge branch 'master' into default_static_files 2018-03-16 09:31:36 -07:00
Nikolay Kim
7e8b231f57 disable test 2018-03-16 09:13:36 -07:00
Douman
8a344d0c94 Add default resource for StaticFiles 2018-03-16 19:04:36 +03:00
Nikolay Kim
4096089a3f allow to disable http/2 support 2018-03-16 08:48:44 -07:00
Nikolay Kim
b16f2d5f05 proper check for actor context poll 2018-03-16 08:04:26 -07:00
Nikolay Kim
5baf15822a always start actors 2018-03-16 07:46:27 -07:00
Nikolay Kim
5368ce823e Merge pull request #123 from h416/patch-1
fix typo
2018-03-16 05:31:10 -07:00
h416
4effdf065b fix typo 2018-03-16 19:03:16 +09:00
Nikolay Kim
61970ab190 always poll stream or actor for the first time 2018-03-15 17:11:49 -07:00
Nikolay Kim
484b00a0f9 Merge branch 'master' of github.com:actix/actix-web 2018-03-15 16:55:33 -07:00
Nikolay Kim
73bf2068aa allow to use NamedFile with any request method 2018-03-15 16:55:22 -07:00
Nikolay Kim
1cda949204 Merge pull request #122 from mockersf/test_qp
test for query parameters in client
2018-03-14 16:10:31 -07:00
François Mockers
ad6b823255 test for query parameters in client 2018-03-14 21:45:49 +01:00
Nikolay Kim
0f064db31d Move brotli encoding to a feature 2018-03-13 17:21:22 -07:00
Nikolay Kim
fd0bb54469 add debug formatter for ClientRequestBuilder 2018-03-13 15:09:05 -07:00
Nikolay Kim
e27bbaa55c Update CHANGES.md 2018-03-13 13:15:21 -07:00
Nikolay Kim
8a50eae1e2 Merge pull request #121 from glademiller/master
Send Query Parameters in client requests
2018-03-13 13:14:51 -07:00
Glade Miller
38080f67b3 If no path is available from the URI request / 2018-03-13 13:35:11 -06:00
Glade Miller
08504e0892 Move path call inline into write 2018-03-13 13:26:13 -06:00
Glade Miller
401c0ad809 https://github.com/actix/actix-web/issues/120 - Send Query Parameters in client requests 2018-03-13 13:17:55 -06:00
Nikolay Kim
b4b0deb7fa Wake payload reading task when data is available 2018-03-12 16:29:13 -07:00
Nikolay Kim
05ff35d383 Fix server keep-alive handling 2018-03-12 16:16:17 -07:00
Nikolay Kim
29c3e8f7ea update test 2018-03-12 10:19:09 -07:00
Nikolay Kim
6657446433 Allow to set read buffer capacity for server request 2018-03-12 10:01:56 -07:00
Nikolay Kim
46b9a9c887 update readme 2018-03-12 09:13:04 -07:00
Nikolay Kim
b3cdb472d0 remove reserved state for h2 write if buffer is empty 2018-03-12 09:04:54 -07:00
Nikolay Kim
31e1aab9a4 do not log WouldBlock error from socket accept 2018-03-12 09:02:15 -07:00
Nikolay Kim
67f383f346 typo 2018-03-11 16:53:46 -07:00
Nikolay Kim
49f5c335f6 better sleep on error 2018-03-11 16:52:20 -07:00
Nikolay Kim
692e11a584 bump version 2018-03-11 16:40:25 -07:00
Nikolay Kim
208117ca6f Merge pull request #118 from messense/feature/sockets-vec
Use Vec instead of HashMap to store sockets in HttpServer
2018-03-11 16:38:23 -07:00
Nikolay Kim
3e276ac921 Merge branch 'master' into feature/sockets-vec 2018-03-11 16:38:17 -07:00
Nikolay Kim
4af115a19c Fix steraming response handling for http/2 2018-03-11 16:37:44 -07:00
Nikolay Kim
051703eb2c Fix connection get closed too early 2018-03-11 15:37:33 -07:00
Nikolay Kim
31fbbd3168 Fix panic on unknown content encoding 2018-03-11 14:50:13 -07:00
Nikolay Kim
fee1e255ac add comments 2018-03-11 10:10:30 -07:00
Nikolay Kim
a4c933e56e update doc string 2018-03-11 09:36:54 -07:00
Nikolay Kim
9ddf5a3550 better doc string for Either 2018-03-11 09:28:22 -07:00
messense
9ab0fa604d Use Vec instead of HashMap to store sockets in HttpServer 2018-03-11 17:29:44 +08:00
Nikolay Kim
6c709b33cc return error on write zero bytes 2018-03-10 10:42:46 -08:00
Nikolay Kim
71b4c07ea4 Fix json content type detection 2018-03-10 10:27:29 -08:00
Nikolay Kim
ac9eba8261 add api doc for Either 2018-03-10 10:12:44 -08:00
Nikolay Kim
cad55f9c80 add Either responder 2018-03-10 09:39:43 -08:00
Nikolay Kim
4263574a58 fix panic in cors if request does not contain origin header and send_wildcard is not set 2018-03-10 08:31:20 -08:00
messense
84ef5ee410 Merge pull request #116 from messense/feature/from-usize-to-keepalive
Impl From<usize> and From<Option<usize>> for KeepAlive
2018-03-10 22:55:55 +08:00
messense
598fb9190d rerun build if USE_SKEPTIC env var changed 2018-03-10 17:53:11 +08:00
messense
9a404a0c03 Impl From<usize> and From<Option<usize>> for KeepAlive 2018-03-10 17:52:50 +08:00
Nikolay Kim
3dd8fdf450 fix guide 2018-03-09 21:40:51 -08:00
Nikolay Kim
05f5ba0084 refactor keep-alive; fixed write to socket for upgraded connection 2018-03-09 16:21:14 -08:00
Nikolay Kim
8169149554 update wstool 2018-03-09 13:12:25 -08:00
Nikolay Kim
8d1de6c497 ws client timeouts 2018-03-09 13:12:14 -08:00
Nikolay Kim
caaace82e3 export symbols 2018-03-09 13:03:15 -08:00
Nikolay Kim
02dd5375a9 aling mask to 8 bytes 2018-03-09 10:25:47 -08:00
Nikolay Kim
717602472a clippy warnings 2018-03-09 10:11:38 -08:00
Nikolay Kim
b56be8e571 write buffer capacity for client 2018-03-09 10:09:13 -08:00
Nikolay Kim
2853086463 add write buffer capacity config 2018-03-09 10:00:15 -08:00
Nikolay Kim
e2107ec6f4 use small vec on hot path 2018-03-09 08:00:44 -08:00
Nikolay Kim
c33caddf57 update tests 2018-03-09 05:50:47 -08:00
Nikolay Kim
db1e04e418 prepare release 2018-03-09 05:42:42 -08:00
Nikolay Kim
f8b8fe3865 add space to cookie header 2018-03-09 05:38:07 -08:00
Nikolay Kim
1c6ddfd34c naming 2018-03-09 05:36:40 -08:00
Nikolay Kim
49e007ff2a move protobuf support to the example 2018-03-09 05:29:06 -08:00
Nikolay Kim
2068eee669 update readme 2018-03-08 20:58:05 -08:00
Nikolay Kim
f3c63e631a add protobuf feature 2018-03-08 20:56:18 -08:00
Nikolay Kim
3f0803a7d3 Merge branch 'master' of github.com:actix/actix-web 2018-03-08 20:39:10 -08:00
Nikolay Kim
f12b613211 more ws optimizations 2018-03-08 20:39:05 -08:00
Nikolay Kim
695c052c58 Merge pull request #115 from kingxsp/master
Add protobuf support
2018-03-08 18:59:18 -08:00
kingxsp
63634be542 Merge branch 'master' into master 2018-03-09 10:22:15 +08:00
Nikolay Kim
f88f1c65b6 update tests 2018-03-08 18:19:46 -08:00
kingxsp
a0b589eb96 Add protobuf support 2018-03-09 10:05:13 +08:00
Nikolay Kim
ebdc983dfe optimize websocket stream 2018-03-08 17:19:50 -08:00
Nikolay Kim
395243a539 another attempt to fix cookie handling 2018-03-08 11:16:54 -08:00
Nikolay Kim
1ab676d7eb bump version and add some tests 2018-03-07 22:40:46 -08:00
52 changed files with 1898 additions and 615 deletions

View File

@@ -59,6 +59,7 @@ script:
cd examples/multipart && cargo check && cd ../..
cd examples/json && cargo check && cd ../..
cd examples/juniper && cargo check && cd ../..
cd examples/protobuf && cargo check && cd ../..
cd examples/state && cargo check && cd ../..
cd examples/template_tera && cargo check && cd ../..
cd examples/diesel && cargo check && cd ../..
@@ -77,7 +78,7 @@ script:
after_success:
- |
if [[ "$TRAVIS_OS_NAME" == "linux" && "$TRAVIS_PULL_REQUEST" = "false" && "$TRAVIS_BRANCH" == "master" && "$TRAVIS_RUST_VERSION" == "nightly" ]]; then
cargo doc --features "alpn, tls" --no-deps &&
cargo doc --features "alpn, tls, session" --no-deps &&
echo "<meta http-equiv=refresh content=0;url=os_balloon/index.html>" > target/doc/index.html &&
cargo install mdbook &&
cd guide && mdbook build -d ../target/doc/guide && cd .. &&

View File

@@ -1,5 +1,51 @@
# Changes
## 0.4.9 (2018-03-16)
* Allow to disable http/2 support
* Wake payload reading task when data is available
* Fix server keep-alive handling
* Send Query Parameters in client requests #120
* Move brotli encoding to a feature
* Add option of default handler for `StaticFiles` handler #57
* Add basic client connection pooling
## 0.4.8 (2018-03-12)
* Allow to set read buffer capacity for server request
* Handle WouldBlock error for socket accept call
## 0.4.7 (2018-03-11)
* Fix panic on unknown content encoding
* Fix connection get closed too early
* Fix streaming response handling for http/2
* Better sleep on error support
## 0.4.6 (2018-03-10)
* Fix client cookie handling
* Fix json content type detection
* Fix CORS middleware #117
* Optimize websockets stream support
## 0.4.5 (2018-03-07)
* Fix compression #103 and #104
@@ -12,9 +58,8 @@
* Better support for `NamedFile` type
* Add `ResponseError` impl for `SendRequestError`.
This improves ergonomics of http client.
* Add `ResponseError` impl for `SendRequestError`. This improves ergonomics of the client.
* Add native-tls support for client
* Allow client connection timeout to be set #108

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-web"
version = "0.4.5"
version = "0.4.9"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix web is a simple, pragmatic, extremely fast, web framework for Rust."
readme = "README.md"
@@ -27,7 +27,7 @@ name = "actix_web"
path = "src/lib.rs"
[features]
default = ["session"]
default = ["session", "brotli"]
# tls
tls = ["native-tls", "tokio-tls"]
@@ -38,12 +38,14 @@ alpn = ["openssl", "openssl/v102", "openssl/v110", "tokio-openssl"]
# sessions
session = ["cookie/secure"]
# brotli encoding
brotli = ["brotli2"]
[dependencies]
actix = "^0.5.2"
actix = "^0.5.4"
base64 = "0.9"
bitflags = "1.0"
brotli2 = "^0.3.2"
failure = "0.1.1"
flate2 = "1.0"
h2 = "0.1"
@@ -67,6 +69,7 @@ encoding = "0.2"
language-tags = "0.2"
url = { version="1.7", features=["query_encoding"] }
cookie = { version="0.10", features=["percent-encode"] }
brotli2 = { version="^0.3.2", optional = true }
# io
mio = "^0.6.13"
@@ -109,6 +112,7 @@ members = [
"examples/diesel",
"examples/r2d2",
"examples/json",
"examples/protobuf",
"examples/hello-world",
"examples/http-proxy",
"examples/multipart",

View File

@@ -10,6 +10,7 @@ Actix web is a simple, pragmatic, extremely fast, web framework for Rust.
* Configurable [request routing](https://actix.github.io/actix-web/guide/qs_5.html)
* Graceful server shutdown
* Multipart streams
* Static assets
* SSL support with openssl or native-tls
* Middlewares ([Logger](https://actix.github.io/actix-web/guide/qs_10.html#logging),
[Session](https://actix.github.io/actix-web/guide/qs_10.html#user-sessions),
@@ -30,7 +31,7 @@ Actix web is a simple, pragmatic, extremely fast, web framework for Rust.
## Example
```rust,ignore
```rust
extern crate actix_web;
use actix_web::*;
@@ -51,13 +52,13 @@ fn main() {
* [Basics](https://github.com/actix/actix-web/tree/master/examples/basics/)
* [Stateful](https://github.com/actix/actix-web/tree/master/examples/state/)
* [Protobuf support](https://github.com/actix/actix-web/tree/master/examples/protobuf/)
* [Multipart streams](https://github.com/actix/actix-web/tree/master/examples/multipart/)
* [Simple websocket session](https://github.com/actix/actix-web/tree/master/examples/websocket/)
* [Tera templates](https://github.com/actix/actix-web/tree/master/examples/template_tera/)
* [Diesel integration](https://github.com/actix/actix-web/tree/master/examples/diesel/)
* [SSL / HTTP/2.0](https://github.com/actix/actix-web/tree/master/examples/tls/)
* [Tcp/Websocket chat](https://github.com/actix/actix-web/tree/master/examples/websocket-chat/)
* [SockJS Server](https://github.com/actix/actix-sockjs)
* [Json](https://github.com/actix/actix-web/tree/master/examples/json/)
You may consider checking out

View File

@@ -6,12 +6,13 @@ use std::{env, fs};
#[cfg(unix)]
fn main() {
println!("cargo:rerun-if-env-changed=USE_SKEPTIC");
let f = env::var("OUT_DIR").unwrap() + "/skeptic-tests.rs";
if env::var("USE_SKEPTIC").is_ok() {
let _ = fs::remove_file(f);
// generates doc tests for `README.md`.
skeptic::generate_doc_tests(
&["README.md",
&[// "README.md",
"guide/src/qs_1.md",
"guide/src/qs_2.md",
"guide/src/qs_3.md",

View File

@@ -1,8 +1,8 @@
//! Actix web diesel example
//!
//! Diesel does not support tokio, so we have to run it in separate threads.
//! Actix supports sync actors by default, so we going to create sync actor that will
//! use diesel. Technically sync actors are worker style actors, multiple of them
//! Actix supports sync actors by default, so we going to create sync actor that use diesel.
//! Technically sync actors are worker style actors, multiple of them
//! can run in parallel and process messages from same queue.
extern crate serde;
extern crate serde_json;
@@ -38,6 +38,7 @@ struct State {
fn index(req: HttpRequest<State>) -> Box<Future<Item=HttpResponse, Error=Error>> {
let name = &req.match_info()["name"];
// send async `CreateUser` message to a `DbExecutor`
req.state().db.send(CreateUser{name: name.to_owned()})
.from_err()
.and_then(|res| {
@@ -54,7 +55,7 @@ fn main() {
let _ = env_logger::init();
let sys = actix::System::new("diesel-example");
// Start db executor actors
// Start 3 db executor actors
let addr = SyncArbiter::start(3, || {
DbExecutor(SqliteConnection::establish("test.db").unwrap())
});

View File

@@ -0,0 +1,16 @@
[package]
name = "protobuf-example"
version = "0.1.0"
authors = ["kingxsp <jin_hb_zh@126.com>"]
[dependencies]
bytes = "0.4"
futures = "0.1"
failure = "0.1"
env_logger = "*"
prost = "0.2.0"
prost-derive = "0.2.0"
actix = "0.5"
actix-web = { path="../../" }

View File

@@ -0,0 +1,66 @@
# just start server and run client.py
# wget https://github.com/google/protobuf/releases/download/v3.5.1/protobuf-python-3.5.1.zip
# unzip protobuf-python-3.5.1.zip.1
# cd protobuf-3.5.1/python/
# python3.6 setup.py install
# pip3.6 install --upgrade pip
# pip3.6 install aiohttp
#!/usr/bin/env python
import test_pb2
import traceback
import sys
import asyncio
import aiohttp
def op():
try:
obj = test_pb2.MyObj()
obj.number = 9
obj.name = 'USB'
#Serialize
sendDataStr = obj.SerializeToString()
#print serialized string value
print('serialized string:', sendDataStr)
#------------------------#
# message transmission #
#------------------------#
receiveDataStr = sendDataStr
receiveData = test_pb2.MyObj()
#Deserialize
receiveData.ParseFromString(receiveDataStr)
print('pares serialize string, return: devId = ', receiveData.number, ', name = ', receiveData.name)
except(Exception, e):
print(Exception, ':', e)
print(traceback.print_exc())
errInfo = sys.exc_info()
print(errInfo[0], ':', errInfo[1])
async def fetch(session):
obj = test_pb2.MyObj()
obj.number = 9
obj.name = 'USB'
async with session.post('http://localhost:8080/', data=obj.SerializeToString(),
headers={"content-type": "application/protobuf"}) as resp:
print(resp.status)
data = await resp.read()
receiveObj = test_pb2.MyObj()
receiveObj.ParseFromString(data)
print(receiveObj)
async def go(loop):
obj = test_pb2.MyObj()
obj.number = 9
obj.name = 'USB'
async with aiohttp.ClientSession(loop=loop) as session:
await fetch(session)
loop = asyncio.get_event_loop()
loop.run_until_complete(go(loop))
loop.close()

View File

@@ -0,0 +1,55 @@
extern crate actix;
extern crate actix_web;
extern crate bytes;
extern crate futures;
#[macro_use]
extern crate failure;
extern crate env_logger;
extern crate prost;
#[macro_use]
extern crate prost_derive;
use actix_web::*;
use futures::Future;
mod protobuf;
use protobuf::ProtoBufResponseBuilder;
#[derive(Clone, Debug, PartialEq, Message)]
pub struct MyObj {
#[prost(int32, tag="1")]
pub number: i32,
#[prost(string, tag="2")]
pub name: String,
}
/// This handler uses `ProtoBufMessage` for loading protobuf object.
fn index(req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> {
protobuf::ProtoBufMessage::new(req)
.from_err() // convert all errors into `Error`
.and_then(|val: MyObj| {
println!("model: {:?}", val);
Ok(httpcodes::HTTPOk.build().protobuf(val)?) // <- send response
})
.responder()
}
fn main() {
::std::env::set_var("RUST_LOG", "actix_web=info");
let _ = env_logger::init();
let sys = actix::System::new("protobuf-example");
let addr = HttpServer::new(|| {
Application::new()
.middleware(middleware::Logger::default())
.resource("/", |r| r.method(Method::POST).f(index))})
.bind("127.0.0.1:8080").unwrap()
.shutdown_timeout(1)
.start();
println!("Started http server: 127.0.0.1:8080");
let _ = sys.run();
}

View File

@@ -0,0 +1,169 @@
use bytes::{Bytes, BytesMut};
use futures::{Poll, Future, Stream};
use bytes::IntoBuf;
use prost::Message;
use prost::DecodeError as ProtoBufDecodeError;
use prost::EncodeError as ProtoBufEncodeError;
use actix_web::header::http::{CONTENT_TYPE, CONTENT_LENGTH};
use actix_web::{Responder, HttpMessage, HttpRequest, HttpResponse};
use actix_web::dev::HttpResponseBuilder;
use actix_web::error::{Error, PayloadError, ResponseError};
use actix_web::httpcodes::{HttpBadRequest, HttpPayloadTooLarge};
#[derive(Fail, Debug)]
pub enum ProtoBufPayloadError {
/// Payload size is bigger than 256k
#[fail(display="Payload size is bigger than 256k")]
Overflow,
/// Content type error
#[fail(display="Content type error")]
ContentType,
/// Serialize error
#[fail(display="ProtoBud serialize error: {}", _0)]
Serialize(#[cause] ProtoBufEncodeError),
/// Deserialize error
#[fail(display="ProtoBud deserialize error: {}", _0)]
Deserialize(#[cause] ProtoBufDecodeError),
/// Payload error
#[fail(display="Error that occur during reading payload: {}", _0)]
Payload(#[cause] PayloadError),
}
impl ResponseError for ProtoBufPayloadError {
fn error_response(&self) -> HttpResponse {
match *self {
ProtoBufPayloadError::Overflow => HttpPayloadTooLarge.into(),
_ => HttpBadRequest.into(),
}
}
}
impl From<PayloadError> for ProtoBufPayloadError {
fn from(err: PayloadError) -> ProtoBufPayloadError {
ProtoBufPayloadError::Payload(err)
}
}
impl From<ProtoBufDecodeError> for ProtoBufPayloadError {
fn from(err: ProtoBufDecodeError) -> ProtoBufPayloadError {
ProtoBufPayloadError::Deserialize(err)
}
}
#[derive(Debug)]
pub struct ProtoBuf<T: Message>(pub T);
impl<T: Message> Responder for ProtoBuf<T> {
type Item = HttpResponse;
type Error = Error;
fn respond_to(self, _: HttpRequest) -> Result<HttpResponse, Error> {
let mut buf = Vec::new();
self.0.encode(&mut buf)
.map_err(|e| Error::from(ProtoBufPayloadError::Serialize(e)))
.and_then(|()| {
Ok(HttpResponse::Ok()
.content_type("application/protobuf")
.body(buf)
.into())
})
}
}
pub struct ProtoBufMessage<T, U: Message + Default>{
limit: usize,
ct: &'static str,
req: Option<T>,
fut: Option<Box<Future<Item=U, Error=ProtoBufPayloadError>>>,
}
impl<T, U: Message + Default> ProtoBufMessage<T, U> {
/// Create `ProtoBufMessage` for request.
pub fn new(req: T) -> Self {
ProtoBufMessage{
limit: 262_144,
req: Some(req),
fut: None,
ct: "application/protobuf",
}
}
/// Change max size of payload. By default max size is 256Kb
pub fn limit(mut self, limit: usize) -> Self {
self.limit = limit;
self
}
/// Set allowed content type.
///
/// By default *application/protobuf* content type is used. Set content type
/// to empty string if you want to disable content type check.
pub fn content_type(mut self, ct: &'static str) -> Self {
self.ct = ct;
self
}
}
impl<T, U: Message + Default + 'static> Future for ProtoBufMessage<T, U>
where T: HttpMessage + Stream<Item=Bytes, Error=PayloadError> + 'static
{
type Item = U;
type Error = ProtoBufPayloadError;
fn poll(&mut self) -> Poll<U, ProtoBufPayloadError> {
if let Some(req) = self.req.take() {
if let Some(len) = req.headers().get(CONTENT_LENGTH) {
if let Ok(s) = len.to_str() {
if let Ok(len) = s.parse::<usize>() {
if len > self.limit {
return Err(ProtoBufPayloadError::Overflow);
}
} else {
return Err(ProtoBufPayloadError::Overflow);
}
}
}
// check content-type
if !self.ct.is_empty() && req.content_type() != self.ct {
return Err(ProtoBufPayloadError::ContentType)
}
let limit = self.limit;
let fut = req.from_err()
.fold(BytesMut::new(), move |mut body, chunk| {
if (body.len() + chunk.len()) > limit {
Err(ProtoBufPayloadError::Overflow)
} else {
body.extend_from_slice(&chunk);
Ok(body)
}
})
.and_then(|body| Ok(<U>::decode(&mut body.into_buf())?));
self.fut = Some(Box::new(fut));
}
self.fut.as_mut().expect("ProtoBufBody could not be used second time").poll()
}
}
pub trait ProtoBufResponseBuilder {
fn protobuf<T: Message>(&mut self, value: T) -> Result<HttpResponse, Error>;
}
impl ProtoBufResponseBuilder for HttpResponseBuilder {
fn protobuf<T: Message>(&mut self, value: T) -> Result<HttpResponse, Error> {
self.header(CONTENT_TYPE, "application/protobuf");
let mut body = Vec::new();
value.encode(&mut body).map_err(|e| ProtoBufPayloadError::Serialize(e))?;
Ok(self.body(body)?)
}
}

View File

@@ -0,0 +1,6 @@
syntax = "proto3";
message MyObj {
int32 number = 1;
string name = 2;
}

View File

@@ -0,0 +1,76 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: test.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='test.proto',
package='',
syntax='proto3',
serialized_pb=_b('\n\ntest.proto\"%\n\x05MyObj\x12\x0e\n\x06number\x18\x01 \x01(\x05\x12\x0c\n\x04name\x18\x02 \x01(\tb\x06proto3')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
_MYOBJ = _descriptor.Descriptor(
name='MyObj',
full_name='MyObj',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='number', full_name='MyObj.number', index=0,
number=1, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='name', full_name='MyObj.name', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=14,
serialized_end=51,
)
DESCRIPTOR.message_types_by_name['MyObj'] = _MYOBJ
MyObj = _reflection.GeneratedProtocolMessageType('MyObj', (_message.Message,), dict(
DESCRIPTOR = _MYOBJ,
__module__ = 'test_pb2'
# @@protoc_insertion_point(class_scope:MyObj)
))
_sym_db.RegisterMessage(MyObj)
# @@protoc_insertion_point(module_scope)

View File

@@ -1,4 +1,4 @@
# websockect
# websocket
Simple echo websocket server.

View File

@@ -134,9 +134,10 @@ for full example.
Actix can wait for requests on a keep-alive connection. *Keep alive*
connection behavior is defined by server settings.
* `Some(75)` - enable 75 sec *keep alive* timer according request and response settings.
* `Some(0)` - disable *keep alive*.
* `None` - Use `SO_KEEPALIVE` socket option.
* `75` or `Some(75)` or `KeepAlive::Timeout(75)` - enable 75 sec *keep alive* timer according
request and response settings.
* `None` or `KeepAlive::Disabled` - disable *keep alive*.
* `KeepAlive::Tcp(75)` - Use `SO_KEEPALIVE` socket option.
```rust
# extern crate actix_web;
@@ -147,7 +148,17 @@ fn main() {
HttpServer::new(||
Application::new()
.resource("/", |r| r.h(httpcodes::HttpOk)))
.keep_alive(None); // <- Use `SO_KEEPALIVE` socket option.
.keep_alive(75); // <- Set keep-alive to 75 seconds
HttpServer::new(||
Application::new()
.resource("/", |r| r.h(httpcodes::HttpOk)))
.keep_alive(server::KeepAlive::Tcp(75)); // <- Use `SO_KEEPALIVE` socket option.
HttpServer::new(||
Application::new()
.resource("/", |r| r.h(httpcodes::HttpOk)))
.keep_alive(None); // <- Disable keep-alive
}
```

View File

@@ -235,6 +235,44 @@ fn main() {
Both methods could be combined. (i.e Async response with streaming body)
## Different return types (Either)
Sometimes you need to return different types of responses. For example
you can do error check and return error and return async response otherwise.
Or any result that requires two different types.
For this case [*Either*](../actix_web/enum.Either.html) type can be used.
*Either* allows to combine two different responder types into a single type.
```rust
# extern crate actix_web;
# extern crate futures;
# use actix_web::*;
# use futures::future::Future;
use futures::future::result;
use actix_web::{Either, Error, HttpResponse, httpcodes};
type RegisterResult = Either<HttpResponse, Box<Future<Item=HttpResponse, Error=Error>>>;
fn index(req: HttpRequest) -> RegisterResult {
if is_a_variant() { // <- choose variant A
Either::A(
httpcodes::HttpBadRequest.with_body("Bad data"))
} else {
Either::B( // <- variant B
result(HttpResponse::Ok()
.content_type("text/html")
.body(format!("Hello!"))
.map_err(|e| e.into())).responder())
}
}
# fn is_a_variant() -> bool { true }
# fn main() {
# Application::new()
# .resource("/register", |r| r.f(index))
# .finish();
# }
```
## Tokio core handle
Any actix web handler runs within properly configured

View File

@@ -1,15 +1,18 @@
use std::{io, time};
use std::{fmt, io, time};
use std::cell::RefCell;
use std::rc::Rc;
use std::net::Shutdown;
use std::time::Duration;
use std::time::{Duration, Instant};
use std::collections::{HashMap, VecDeque};
use actix::{fut, Actor, ActorFuture, Context,
use actix::{fut, Actor, ActorFuture, Context, AsyncContext,
Handler, Message, ActorResponse, Supervised};
use actix::registry::ArbiterService;
use actix::fut::WrapFuture;
use actix::actors::{Connector, ConnectorError, Connect as ResolveConnect};
use http::{Uri, HttpTryFrom, Error as HttpError};
use futures::Poll;
use futures::{Async, Poll};
use tokio_io::{AsyncRead, AsyncWrite};
#[cfg(feature="alpn")]
@@ -77,7 +80,7 @@ pub enum ClientConnectorError {
#[fail(display = "{}", _0)]
Connector(#[cause] ConnectorError),
/// Connecting took too long
/// Connection took too long
#[fail(display = "Timeout out while establishing connection")]
Timeout,
@@ -104,10 +107,15 @@ pub struct ClientConnector {
connector: SslConnector,
#[cfg(all(feature="tls", not(feature="alpn")))]
connector: TlsConnector,
pool: Rc<Pool>,
}
impl Actor for ClientConnector {
type Context = Context<ClientConnector>;
fn started(&mut self, ctx: &mut Self::Context) {
self.collect(ctx);
}
}
impl Supervised for ClientConnector {}
@@ -120,19 +128,21 @@ impl Default for ClientConnector {
{
let builder = SslConnector::builder(SslMethod::tls()).unwrap();
ClientConnector {
connector: builder.build()
connector: builder.build(),
pool: Rc::new(Pool::new()),
}
}
#[cfg(all(feature="tls", not(feature="alpn")))]
{
let builder = TlsConnector::builder().unwrap();
ClientConnector {
connector: builder.build().unwrap()
connector: builder.build().unwrap(),
pool: Rc::new(Pool::new()),
}
}
#[cfg(not(any(feature="alpn", feature="tls")))]
ClientConnector {}
ClientConnector {pool: Rc::new(Pool::new())}
}
}
@@ -182,7 +192,12 @@ impl ClientConnector {
/// }
/// ```
pub fn with_connector(connector: SslConnector) -> ClientConnector {
ClientConnector { connector }
ClientConnector { connector, pool: Rc::new(Pool::new()) }
}
fn collect(&mut self, ctx: &mut Context<Self>) {
self.pool.collect();
ctx.run_later(Duration::from_secs(1), |act, ctx| act.collect(ctx));
}
}
@@ -214,10 +229,21 @@ impl Handler<Connect> for ClientConnector {
let host = uri.host().unwrap().to_owned();
let port = uri.port().unwrap_or_else(|| proto.port());
let key = Key {host, port, ssl: proto.is_secure()};
let pool = if proto.is_http() {
if let Some(conn) = self.pool.query(&key) {
return ActorResponse::async(fut::ok(conn))
} else {
Some(Rc::clone(&self.pool))
}
} else {
None
};
ActorResponse::async(
Connector::from_registry()
.send(ResolveConnect::host_and_port(&host, port)
.send(ResolveConnect::host_and_port(&key.host, port)
.timeout(conn_timeout))
.into_actor(self)
.map_err(|_, _, _| ClientConnectorError::Disconnected)
@@ -228,12 +254,14 @@ impl Handler<Connect> for ClientConnector {
Ok(stream) => {
if proto.is_secure() {
fut::Either::A(
_act.connector.connect_async(&host, stream)
_act.connector.connect_async(&key.host, stream)
.map_err(ClientConnectorError::SslError)
.map(|stream| Connection{stream: Box::new(stream)})
.map(|stream| Connection::new(
key, pool, Box::new(stream)))
.into_actor(_act))
} else {
fut::Either::B(fut::ok(Connection{stream: Box::new(stream)}))
fut::Either::B(fut::ok(
Connection::new(key, pool, Box::new(stream))))
}
}
}
@@ -244,12 +272,14 @@ impl Handler<Connect> for ClientConnector {
Ok(stream) => {
if proto.is_secure() {
fut::Either::A(
_act.connector.connect_async(&host, stream)
_act.connector.connect_async(&key.host, stream)
.map_err(ClientConnectorError::SslError)
.map(|stream| Connection{stream: Box::new(stream)})
.map(|stream| Connection::new(
key, pool, Box::new(stream)))
.into_actor(_act))
} else {
fut::Either::B(fut::ok(Connection{stream: Box::new(stream)}))
fut::Either::B(fut::ok(
Connection::new(key, pool, Box::new(stream))))
}
}
}
@@ -261,7 +291,7 @@ impl Handler<Connect> for ClientConnector {
if proto.is_secure() {
fut::err(ClientConnectorError::SslIsNotSupported)
} else {
fut::ok(Connection{stream: Box::new(stream)})
fut::ok(Connection::new(key, pool, Box::new(stream)))
}
}
}
@@ -288,6 +318,13 @@ impl Protocol {
}
}
fn is_http(&self) -> bool {
match *self {
Protocol::Https | Protocol::Http => true,
_ => false,
}
}
fn is_secure(&self) -> bool {
match *self {
Protocol::Https | Protocol::Wss => true,
@@ -303,18 +340,156 @@ impl Protocol {
}
}
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
struct Key {
host: String,
port: u16,
ssl: bool,
}
impl Key {
fn empty() -> Key {
Key{host: String::new(), port: 0, ssl: false}
}
}
#[derive(Debug)]
struct Conn(Instant, Connection);
pub struct Pool {
max_size: usize,
keep_alive: Duration,
max_lifetime: Duration,
pool: RefCell<HashMap<Key, VecDeque<Conn>>>,
to_close: RefCell<Vec<Connection>>,
}
impl Pool {
fn new() -> Pool {
Pool {
max_size: 128,
keep_alive: Duration::from_secs(15),
max_lifetime: Duration::from_secs(75),
pool: RefCell::new(HashMap::new()),
to_close: RefCell::new(Vec::new()),
}
}
fn collect(&self) {
let mut pool = self.pool.borrow_mut();
let mut to_close = self.to_close.borrow_mut();
// check keep-alive
let now = Instant::now();
for conns in pool.values_mut() {
while !conns.is_empty() {
if (now - conns[0].0) > self.keep_alive
|| (now - conns[0].1.ts) > self.max_lifetime
{
let conn = conns.pop_front().unwrap().1;
to_close.push(conn);
} else {
break
}
}
}
// check connections for shutdown
let mut idx = 0;
while idx < to_close.len() {
match AsyncWrite::shutdown(&mut to_close[idx]) {
Ok(Async::NotReady) => idx += 1,
_ => {
to_close.swap_remove(idx);
},
}
}
}
fn query(&self, key: &Key) -> Option<Connection> {
let mut pool = self.pool.borrow_mut();
let mut to_close = self.to_close.borrow_mut();
if let Some(ref mut connections) = pool.get_mut(key) {
let now = Instant::now();
while let Some(conn) = connections.pop_back() {
// check if it still usable
if (now - conn.0) > self.keep_alive
|| (now - conn.1.ts) > self.max_lifetime
{
to_close.push(conn.1);
} else {
let mut conn = conn.1;
let mut buf = [0; 2];
match conn.stream().read(&mut buf) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => (),
Ok(n) if n > 0 => {
to_close.push(conn);
continue
},
Ok(_) | Err(_) => continue,
}
return Some(conn)
}
}
}
None
}
fn release(&self, conn: Connection) {
if (Instant::now() - conn.ts) < self.max_lifetime {
let mut pool = self.pool.borrow_mut();
if !pool.contains_key(&conn.key) {
let key = conn.key.clone();
let mut vec = VecDeque::new();
vec.push_back(Conn(Instant::now(), conn));
pool.insert(key, vec);
} else {
let vec = pool.get_mut(&conn.key).unwrap();
vec.push_back(Conn(Instant::now(), conn));
if vec.len() > self.max_size {
let conn = vec.pop_front().unwrap();
self.to_close.borrow_mut().push(conn.1);
}
}
}
}
}
pub struct Connection {
key: Key,
stream: Box<IoStream>,
pool: Option<Rc<Pool>>,
ts: Instant,
}
impl fmt::Debug for Connection {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Connection {}:{}", self.key.host, self.key.port)
}
}
impl Connection {
fn new(key: Key, pool: Option<Rc<Pool>>, stream: Box<IoStream>) -> Self {
Connection {
key, pool, stream,
ts: Instant::now(),
}
}
pub fn stream(&mut self) -> &mut IoStream {
&mut *self.stream
}
pub fn from_stream<T: IoStream>(io: T) -> Connection {
Connection{stream: Box::new(io)}
Connection::new(Key::empty(), None, Box::new(io))
}
pub fn release(mut self) {
if let Some(pool) = self.pool.take() {
pool.release(self)
}
}
}

View File

@@ -78,7 +78,6 @@ impl HttpResponseParser {
-> Poll<Option<Bytes>, PayloadError>
where T: IoStream
{
println!("PARSE payload, {:?}", self.decoder.is_some());
if self.decoder.is_some() {
loop {
// read payload

View File

@@ -171,7 +171,8 @@ impl Future for SendRequest {
};
let pl = Box::new(Pipeline {
body, conn, writer,
body, writer,
conn: Some(conn),
parser: Some(HttpResponseParser::default()),
parser_buf: BytesMut::new(),
disconnected: false,
@@ -208,7 +209,7 @@ impl Future for SendRequest {
pub(crate) struct Pipeline {
body: IoBody,
conn: Connection,
conn: Option<Connection>,
writer: HttpClientWriter,
parser: Option<HttpResponseParser>,
parser_buf: BytesMut,
@@ -249,30 +250,45 @@ impl RunningState {
impl Pipeline {
fn release_conn(&mut self) {
if let Some(conn) = self.conn.take() {
conn.release()
}
}
#[inline]
pub fn parse(&mut self) -> Poll<ClientResponse, HttpResponseParserError> {
match self.parser.as_mut().unwrap().parse(&mut self.conn, &mut self.parser_buf) {
Ok(Async::Ready(resp)) => {
// check content-encoding
if self.should_decompress {
if let Some(enc) = resp.headers().get(CONTENT_ENCODING) {
if let Ok(enc) = enc.to_str() {
match ContentEncoding::from(enc) {
ContentEncoding::Auto | ContentEncoding::Identity => (),
enc => self.decompress = Some(PayloadStream::new(enc)),
fn parse(&mut self) -> Poll<ClientResponse, HttpResponseParserError> {
if let Some(ref mut conn) = self.conn {
match self.parser.as_mut().unwrap().parse(conn, &mut self.parser_buf) {
Ok(Async::Ready(resp)) => {
// check content-encoding
if self.should_decompress {
if let Some(enc) = resp.headers().get(CONTENT_ENCODING) {
if let Ok(enc) = enc.to_str() {
match ContentEncoding::from(enc) {
ContentEncoding::Auto | ContentEncoding::Identity => (),
enc => self.decompress = Some(PayloadStream::new(enc)),
}
}
}
}
}
Ok(Async::Ready(resp))
Ok(Async::Ready(resp))
}
val => val,
}
val => val,
} else {
Ok(Async::NotReady)
}
}
#[inline]
pub fn poll(&mut self) -> Poll<Option<Bytes>, PayloadError> {
if self.conn.is_none() {
return Ok(Async::Ready(None))
}
let conn: &mut Connection = unsafe{ mem::transmute(self.conn.as_mut().unwrap())};
let mut need_run = false;
// need write?
@@ -286,7 +302,7 @@ impl Pipeline {
if self.parser.is_some() {
loop {
match self.parser.as_mut().unwrap()
.parse_payload(&mut self.conn, &mut self.parser_buf)?
.parse_payload(conn, &mut self.parser_buf)?
{
Async::Ready(Some(b)) => {
if let Some(ref mut decompress) = self.decompress {
@@ -314,6 +330,7 @@ impl Pipeline {
if let Some(mut decompress) = self.decompress.take() {
let res = decompress.feed_eof();
if let Some(b) = res? {
self.release_conn();
return Ok(Async::Ready(Some(b)))
}
}
@@ -321,13 +338,14 @@ impl Pipeline {
if need_run {
Ok(Async::NotReady)
} else {
self.release_conn();
Ok(Async::Ready(None))
}
}
#[inline]
pub fn poll_write(&mut self) -> Poll<(), Error> {
if self.write_state == RunningState::Done {
fn poll_write(&mut self) -> Poll<(), Error> {
if self.write_state == RunningState::Done || self.conn.is_none() {
return Ok(Async::Ready(()))
}
@@ -416,7 +434,7 @@ impl Pipeline {
}
// flush io but only if we need to
match self.writer.poll_completed(&mut self.conn, false) {
match self.writer.poll_completed(self.conn.as_mut().unwrap(), false) {
Ok(Async::Ready(_)) => {
if self.disconnected {
self.write_state = RunningState::Done;

View File

@@ -1,4 +1,5 @@
use std::{fmt, mem};
use std::fmt::Write as FmtWrite;
use std::io::Write;
use actix::{Addr, Unsync};
@@ -8,6 +9,7 @@ use http::{uri, HeaderMap, Method, Version, Uri, HttpTryFrom, Error as HttpError
use http::header::{self, HeaderName, HeaderValue};
use serde_json;
use serde::Serialize;
use percent_encoding::{USERINFO_ENCODE_SET, percent_encode};
use body::Body;
use error::Error;
@@ -26,7 +28,7 @@ pub struct ClientRequest {
upgrade: bool,
encoding: ContentEncoding,
response_decompress: bool,
buffer_capacity: Option<(usize, usize)>,
buffer_capacity: usize,
conn: ConnectionType,
}
@@ -49,7 +51,7 @@ impl Default for ClientRequest {
upgrade: false,
encoding: ContentEncoding::Auto,
response_decompress: true,
buffer_capacity: None,
buffer_capacity: 32_768,
conn: ConnectionType::Default,
}
}
@@ -177,7 +179,8 @@ impl ClientRequest {
self.response_decompress
}
pub fn buffer_capacity(&self) -> Option<(usize, usize)> {
/// Requested write buffer capacity
pub fn write_buffer_capacity(&self) -> usize {
self.buffer_capacity
}
@@ -464,12 +467,11 @@ impl ClientRequestBuilder {
}
/// Set write buffer capacity
pub fn buffer_capacity(&mut self,
low_watermark: usize,
high_watermark: usize) -> &mut Self
{
///
/// Default buffer capacity is 32kb
pub fn write_buffer_capacity(&mut self, cap: usize) -> &mut Self {
if let Some(parts) = parts(&mut self.request, &self.err) {
parts.buffer_capacity = Some((low_watermark, high_watermark));
parts.buffer_capacity = cap;
}
self
}
@@ -539,10 +541,14 @@ impl ClientRequestBuilder {
// set cookies
if let Some(ref mut jar) = self.cookies {
for cookie in jar.delta() {
request.headers.append(
header::COOKIE, HeaderValue::from_str(&cookie.to_string()).unwrap());
let mut cookie = String::new();
for c in jar.delta() {
let name = percent_encode(c.name().as_bytes(), USERINFO_ENCODE_SET);
let value = percent_encode(c.value().as_bytes(), USERINFO_ENCODE_SET);
let _ = write!(&mut cookie, "; {}={}", name, value);
}
request.headers.insert(
header::COOKIE, HeaderValue::from_str(&cookie.as_str()[2..]).unwrap());
}
request.body = body.into();
Ok(request)
@@ -593,3 +599,19 @@ fn parts<'a>(parts: &'a mut Option<ClientRequest>, err: &Option<HttpError>)
}
parts.as_mut()
}
impl fmt::Debug for ClientRequestBuilder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if let Some(ref parts) = self.request {
let res = write!(f, "\nClientRequestBuilder {:?} {}:{}\n",
parts.version, parts.method, parts.uri);
let _ = write!(f, " headers:\n");
for (key, val) in parts.headers.iter() {
let _ = write!(f, " {:?}: {:?}\n", key, val);
}
res
} else {
write!(f, "ClientRequestBuilder(Consumed)")
}
}
}

View File

@@ -129,3 +129,20 @@ impl Stream for ClientResponse {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_debug() {
let resp = ClientResponse::new(ClientMessage::default());
resp.as_mut().headers.insert(
header::COOKIE, HeaderValue::from_static("cookie1=value1"));
resp.as_mut().headers.insert(
header::COOKIE, HeaderValue::from_static("cookie2=value2"));
let dbg = format!("{:?}", resp);
assert!(dbg.contains("ClientResponse"));
}
}

View File

@@ -13,10 +13,11 @@ use http::header::{HeaderValue, DATE,
CONNECTION, CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING};
use flate2::Compression;
use flate2::write::{GzEncoder, DeflateEncoder};
#[cfg(feature="brotli")]
use brotli2::write::BrotliEncoder;
use body::{Body, Binary};
use headers::ContentEncoding;
use header::ContentEncoding;
use server::WriterState;
use server::shared::SharedBytes;
use server::encoding::{ContentEncoder, TransferEncoding};
@@ -24,8 +25,6 @@ use server::encoding::{ContentEncoder, TransferEncoding};
use client::ClientRequest;
const LOW_WATERMARK: usize = 1024;
const HIGH_WATERMARK: usize = 8 * LOW_WATERMARK;
const AVERAGE_HEADER_SIZE: usize = 30;
bitflags! {
@@ -42,9 +41,8 @@ pub(crate) struct HttpClientWriter {
written: u64,
headers_size: u32,
buffer: SharedBytes,
buffer_capacity: usize,
encoder: ContentEncoder,
low: usize,
high: usize,
}
impl HttpClientWriter {
@@ -55,10 +53,9 @@ impl HttpClientWriter {
flags: Flags::empty(),
written: 0,
headers_size: 0,
buffer_capacity: 0,
buffer,
encoder,
low: LOW_WATERMARK,
high: HIGH_WATERMARK,
}
}
@@ -70,12 +67,6 @@ impl HttpClientWriter {
// self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE)
// }
/// Set write buffer capacity
pub fn set_buffer_capacity(&mut self, low_watermark: usize, high_watermark: usize) {
self.low = low_watermark;
self.high = high_watermark;
}
fn write_to_stream<T: AsyncWrite>(&mut self, stream: &mut T) -> io::Result<WriterState> {
while !self.buffer.is_empty() {
match stream.write(self.buffer.as_ref()) {
@@ -87,7 +78,7 @@ impl HttpClientWriter {
let _ = self.buffer.split_to(n);
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if self.buffer.len() > self.high {
if self.buffer.len() > self.buffer_capacity {
return Ok(WriterState::Pause)
} else {
return Ok(WriterState::Done)
@@ -106,9 +97,6 @@ impl HttpClientWriter {
// prepare task
self.flags.insert(Flags::STARTED);
self.encoder = content_encoder(self.buffer.clone(), msg);
if let Some(capacity) = msg.buffer_capacity() {
self.set_buffer_capacity(capacity.0, capacity.1);
}
// render message
{
@@ -125,7 +113,9 @@ impl HttpClientWriter {
// status line
let _ = write!(buffer, "{} {} {:?}\r\n",
msg.method(), msg.uri().path(), msg.version());
msg.method(),
msg.uri().path_and_query().map(|u| u.as_str()).unwrap_or("/"),
msg.version());
// write headers
for (key, value) in msg.headers() {
@@ -153,6 +143,8 @@ impl HttpClientWriter {
self.written += bytes.len() as u64;
self.encoder.write(bytes)?;
}
} else {
self.buffer_capacity = msg.write_buffer_capacity();
}
}
Ok(())
@@ -168,7 +160,7 @@ impl HttpClientWriter {
}
}
if self.buffer.len() > self.high {
if self.buffer.len() > self.buffer_capacity {
Ok(WriterState::Pause)
} else {
Ok(WriterState::Done)
@@ -224,6 +216,7 @@ fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> ContentEncoder
DeflateEncoder::new(transfer, Compression::default())),
ContentEncoding::Gzip => ContentEncoder::Gzip(
GzEncoder::new(transfer, Compression::default())),
#[cfg(feature="brotli")]
ContentEncoding::Br => ContentEncoder::Br(
BrotliEncoder::new(transfer, 5)),
ContentEncoding::Identity => ContentEncoder::Identity(transfer),
@@ -273,6 +266,7 @@ fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> ContentEncoder
DeflateEncoder::new(transfer, Compression::default())),
ContentEncoding::Gzip => ContentEncoder::Gzip(
GzEncoder::new(transfer, Compression::default())),
#[cfg(feature="brotli")]
ContentEncoding::Br => ContentEncoder::Br(
BrotliEncoder::new(transfer, 5)),
ContentEncoding::Identity | ContentEncoding::Auto => ContentEncoder::Identity(transfer),

118
src/fs.rs
View File

@@ -21,11 +21,11 @@ use mime_guess::get_mime_type;
use header;
use error::Error;
use param::FromParam;
use handler::{Handler, Responder};
use handler::{Handler, RouteHandler, WrapHandler, Responder, Reply};
use httpmessage::HttpMessage;
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
use httpcodes::{HttpOk, HttpFound, HttpMethodNotAllowed};
use httpcodes::{HttpOk, HttpFound, HttpNotFound, HttpMethodNotAllowed};
/// A file with an associated name; responds with the Content-Type based on the
/// file extension.
@@ -36,6 +36,7 @@ pub struct NamedFile {
md: Metadata,
modified: Option<SystemTime>,
cpu_pool: Option<CpuPool>,
only_get: bool,
}
impl NamedFile {
@@ -54,7 +55,14 @@ impl NamedFile {
let path = path.as_ref().to_path_buf();
let modified = md.modified().ok();
let cpu_pool = None;
Ok(NamedFile{path, file, md, modified, cpu_pool})
Ok(NamedFile{path, file, md, modified, cpu_pool, only_get: false})
}
/// Allow only GET and HEAD methods
#[inline]
pub fn only_get(mut self) -> Self {
self.only_get = true;
self
}
/// Returns reference to the underlying `File` object.
@@ -168,7 +176,7 @@ impl Responder for NamedFile {
type Error = io::Error;
fn respond_to(self, req: HttpRequest) -> Result<HttpResponse, io::Error> {
if *req.method() != Method::GET && *req.method() != Method::HEAD {
if self.only_get && *req.method() != Method::GET && *req.method() != Method::HEAD {
return Ok(HttpMethodNotAllowed.build()
.header(header::http::CONTENT_TYPE, "text/plain")
.header(header::http::ALLOW, "GET, HEAD")
@@ -215,7 +223,9 @@ impl Responder for NamedFile {
return Ok(resp.status(StatusCode::NOT_MODIFIED).finish().unwrap())
}
if *req.method() == Method::GET {
if *req.method() == Method::HEAD {
Ok(resp.finish().unwrap())
} else {
let reader = ChunkedReadFile {
size: self.md.len(),
offset: 0,
@@ -224,8 +234,6 @@ impl Responder for NamedFile {
fut: None,
};
Ok(resp.streaming(reader).unwrap())
} else {
Ok(resp.finish().unwrap())
}
}
}
@@ -354,27 +362,6 @@ impl Responder for Directory {
}
}
/// This enum represents all filesystem elements.
pub enum FilesystemElement {
File(NamedFile),
Directory(Directory),
Redirect(HttpResponse),
}
impl Responder for FilesystemElement {
type Item = HttpResponse;
type Error = io::Error;
fn respond_to(self, req: HttpRequest) -> Result<HttpResponse, io::Error> {
match self {
FilesystemElement::File(file) => file.respond_to(req),
FilesystemElement::Directory(dir) => dir.respond_to(req),
FilesystemElement::Redirect(resp) => Ok(resp),
}
}
}
/// Static files handling
///
/// `StaticFile` handler must be registered with `Application::handler()` method,
@@ -390,23 +377,24 @@ impl Responder for FilesystemElement {
/// .finish();
/// }
/// ```
pub struct StaticFiles {
pub struct StaticFiles<S> {
directory: PathBuf,
accessible: bool,
index: Option<String>,
show_index: bool,
cpu_pool: CpuPool,
default: Box<RouteHandler<S>>,
_chunk_size: usize,
_follow_symlinks: bool,
}
impl StaticFiles {
impl<S: 'static> StaticFiles<S> {
/// Create new `StaticFiles` instance
///
/// `dir` - base directory
///
/// `index` - show index for directory
pub fn new<T: Into<PathBuf>>(dir: T, index: bool) -> StaticFiles {
pub fn new<T: Into<PathBuf>>(dir: T, index: bool) -> StaticFiles<S> {
let dir = dir.into();
let (dir, access) = match dir.canonicalize() {
@@ -430,6 +418,7 @@ impl StaticFiles {
index: None,
show_index: index,
cpu_pool: CpuPool::new(40),
default: Box::new(WrapHandler::new(|_| HttpNotFound)),
_chunk_size: 0,
_follow_symlinks: false,
}
@@ -439,28 +428,30 @@ impl StaticFiles {
///
/// Redirects to specific index file for directory "/" instead of
/// showing files listing.
pub fn index_file<T: Into<String>>(mut self, index: T) -> StaticFiles {
pub fn index_file<T: Into<String>>(mut self, index: T) -> StaticFiles<S> {
self.index = Some(index.into());
self
}
/// Sets default resource which is used when no matched file could be found.
pub fn default_handler<H: Handler<S>>(mut self, handler: H) -> StaticFiles<S> {
self.default = Box::new(WrapHandler::new(handler));
self
}
}
impl<S> Handler<S> for StaticFiles {
type Result = Result<FilesystemElement, io::Error>;
impl<S: 'static> Handler<S> for StaticFiles<S> {
type Result = Result<Reply, Error>;
fn handle(&mut self, req: HttpRequest<S>) -> Self::Result {
if !self.accessible {
Err(io::Error::new(io::ErrorKind::NotFound, "not found"))
Ok(self.default.handle(req))
} else {
let path = if let Some(path) = req.match_info().get("tail") {
path
} else {
return Err(io::Error::new(io::ErrorKind::NotFound, "not found"))
let relpath = match req.match_info().get("tail").map(PathBuf::from_param) {
Some(Ok(path)) => path,
_ => return Ok(self.default.handle(req))
};
let relpath = PathBuf::from_param(path)
.map_err(|_| io::Error::new(io::ErrorKind::NotFound, "not found"))?;
// full filepath
let path = self.directory.join(&relpath).canonicalize()?;
@@ -474,20 +465,21 @@ impl<S> Handler<S> for StaticFiles {
new_path.push('/');
}
new_path.push_str(redir_index);
Ok(FilesystemElement::Redirect(
HttpFound
.build()
.header(header::http::LOCATION, new_path.as_str())
.finish().unwrap()))
HttpFound.build()
.header(header::http::LOCATION, new_path.as_str())
.finish().unwrap()
.respond_to(req.without_state())
} else if self.show_index {
Ok(FilesystemElement::Directory(
Directory::new(self.directory.clone(), path)))
Directory::new(self.directory.clone(), path)
.respond_to(req.without_state())?
.respond_to(req.without_state())
} else {
Err(io::Error::new(io::ErrorKind::NotFound, "not found"))
Ok(self.default.handle(req))
}
} else {
Ok(FilesystemElement::File(
NamedFile::open(path)?.set_cpu_pool(self.cpu_pool.clone())))
NamedFile::open(path)?.set_cpu_pool(self.cpu_pool.clone())
.respond_to(req.without_state())?
.respond_to(req.without_state())
}
}
}
@@ -517,25 +509,38 @@ mod tests {
let req = TestRequest::default().method(Method::POST).finish();
let file = NamedFile::open("Cargo.toml").unwrap();
let resp = file.respond_to(req).unwrap();
let resp = file.only_get().respond_to(req).unwrap();
assert_eq!(resp.status(), StatusCode::METHOD_NOT_ALLOWED);
}
#[test]
fn test_named_file_any_method() {
let req = TestRequest::default().method(Method::POST).finish();
let file = NamedFile::open("Cargo.toml").unwrap();
let resp = file.respond_to(req).unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}
#[test]
fn test_static_files() {
let mut st = StaticFiles::new(".", true);
st.accessible = false;
assert!(st.handle(HttpRequest::default()).is_err());
let resp = st.handle(HttpRequest::default()).respond_to(HttpRequest::default()).unwrap();
let resp = resp.as_response().expect("HTTP Response");
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
st.accessible = true;
st.show_index = false;
assert!(st.handle(HttpRequest::default()).is_err());
let resp = st.handle(HttpRequest::default()).respond_to(HttpRequest::default()).unwrap();
let resp = resp.as_response().expect("HTTP Response");
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
let mut req = HttpRequest::default();
req.match_info_mut().add("tail", "");
st.show_index = true;
let resp = st.handle(req).respond_to(HttpRequest::default()).unwrap();
let resp = resp.as_response().expect("HTTP Response");
assert_eq!(resp.headers().get(header::CONTENT_TYPE).unwrap(), "text/html; charset=utf-8");
assert!(resp.body().is_binary());
assert!(format!("{:?}", resp.body()).contains("README.md"));
@@ -548,6 +553,7 @@ mod tests {
req.match_info_mut().add("tail", "guide");
let resp = st.handle(req).respond_to(HttpRequest::default()).unwrap();
let resp = resp.as_response().expect("HTTP Response");
assert_eq!(resp.status(), StatusCode::FOUND);
assert_eq!(resp.headers().get(header::LOCATION).unwrap(), "/guide/index.html");
@@ -555,6 +561,7 @@ mod tests {
req.match_info_mut().add("tail", "guide/");
let resp = st.handle(req).respond_to(HttpRequest::default()).unwrap();
let resp = resp.as_response().expect("HTTP Response");
assert_eq!(resp.status(), StatusCode::FOUND);
assert_eq!(resp.headers().get(header::LOCATION).unwrap(), "/guide/index.html");
}
@@ -566,6 +573,7 @@ mod tests {
req.match_info_mut().add("tail", "examples/basics");
let resp = st.handle(req).respond_to(HttpRequest::default()).unwrap();
let resp = resp.as_response().expect("HTTP Response");
assert_eq!(resp.status(), StatusCode::FOUND);
assert_eq!(resp.headers().get(header::LOCATION).unwrap(), "/examples/basics/Cargo.toml");
}

View File

@@ -34,6 +34,63 @@ pub trait Responder {
fn respond_to(self, req: HttpRequest) -> Result<Self::Item, Self::Error>;
}
/// Combines two different responder types into a single type
///
/// ```rust
/// # extern crate actix_web;
/// # extern crate futures;
/// # use futures::future::Future;
/// use actix_web::AsyncResponder;
/// use futures::future::result;
/// use actix_web::{Either, Error, HttpRequest, HttpResponse, httpcodes};
///
/// type RegisterResult = Either<HttpResponse, Box<Future<Item=HttpResponse, Error=Error>>>;
///
///
/// fn index(req: HttpRequest) -> RegisterResult {
/// if is_a_variant() { // <- choose variant A
/// Either::A(
/// httpcodes::HttpBadRequest.with_body("Bad data"))
/// } else {
/// Either::B( // <- variant B
/// result(HttpResponse::Ok()
/// .content_type("text/html")
/// .body(format!("Hello!"))
/// .map_err(|e| e.into())).responder())
/// }
/// }
/// # fn is_a_variant() -> bool { true }
/// # fn main() {}
/// ```
#[derive(Debug)]
pub enum Either<A, B> {
/// First branch of the type
A(A),
/// Second branch of the type
B(B),
}
impl<A, B> Responder for Either<A, B>
where A: Responder, B: Responder
{
type Item = Reply;
type Error = Error;
fn respond_to(self, req: HttpRequest) -> Result<Reply, Error> {
match self {
Either::A(a) => match a.respond_to(req) {
Ok(val) => Ok(val.into()),
Err(err) => Err(err.into()),
},
Either::B(b) => match b.respond_to(req) {
Ok(val) => Ok(val.into()),
Err(err) => Err(err.into()),
},
}
}
}
#[doc(hidden)]
/// Convenience trait that convert `Future` object into `Boxed` future
pub trait AsyncResponder<I, E>: Sized {

View File

@@ -119,6 +119,7 @@ pub enum ContentEncoding {
/// Automatically select encoding based on encoding negotiation
Auto,
/// A format using the Brotli algorithm
#[cfg(feature="brotli")]
Br,
/// A format using the zlib structure with deflate algorithm
Deflate,
@@ -141,6 +142,7 @@ impl ContentEncoding {
#[inline]
pub fn as_str(&self) -> &'static str {
match *self {
#[cfg(feature="brotli")]
ContentEncoding::Br => "br",
ContentEncoding::Gzip => "gzip",
ContentEncoding::Deflate => "deflate",
@@ -150,6 +152,7 @@ impl ContentEncoding {
/// default quality value
pub fn quality(&self) -> f64 {
match *self {
#[cfg(feature="brotli")]
ContentEncoding::Br => 1.1,
ContentEncoding::Gzip => 1.0,
ContentEncoding::Deflate => 0.9,
@@ -162,11 +165,11 @@ impl ContentEncoding {
impl<'a> From<&'a str> for ContentEncoding {
fn from(s: &'a str) -> ContentEncoding {
match s.trim().to_lowercase().as_ref() {
#[cfg(feature="brotli")]
"br" => ContentEncoding::Br,
"gzip" => ContentEncoding::Gzip,
"deflate" => ContentEncoding::Deflate,
"identity" => ContentEncoding::Identity,
_ => ContentEncoding::Auto,
_ => ContentEncoding::Identity,
}
}
}

View File

@@ -97,7 +97,8 @@ impl HttpRequest<()> {
/// Construct a new Request.
#[inline]
pub fn new(method: Method, uri: Uri,
version: Version, headers: HeaderMap, payload: Option<Payload>) -> HttpRequest
version: Version, headers: HeaderMap, payload: Option<Payload>)
-> HttpRequest
{
HttpRequest(
SharedHttpInnerMessage::from_message(HttpInnerMessage {
@@ -120,7 +121,7 @@ impl HttpRequest<()> {
}
#[inline(always)]
#[cfg_attr(feature = "cargo-clippy", allow(inline_always))]
#[cfg_attr(feature="cargo-clippy", allow(inline_always))]
pub(crate) fn from_message(msg: SharedHttpInnerMessage) -> HttpRequest {
HttpRequest(msg, None, None)
}
@@ -335,7 +336,11 @@ impl<S> HttpRequest<S> {
let mut cookies = Vec::new();
for hdr in msg.headers.get_all(header::COOKIE) {
let s = str::from_utf8(hdr.as_bytes()).map_err(CookieParseError::from)?;
cookies.push(Cookie::parse_encoded(s)?.into_owned());
for cookie_str in s.split(';').map(|s| s.trim()) {
if !cookie_str.is_empty() {
cookies.push(Cookie::parse_encoded(cookie_str)?.into_owned());
}
}
}
msg.cookies = Some(cookies)
}
@@ -384,6 +389,15 @@ impl<S> HttpRequest<S> {
self.as_ref().method == Method::CONNECT
}
/// Set read buffer capacity
///
/// Default buffer capacity is 32Kb.
pub fn set_read_buffer_capacity(&mut self, cap: usize) {
if let Some(ref mut payload) = self.as_mut().payload {
payload.set_read_buffer_capacity(cap)
}
}
#[cfg(test)]
pub(crate) fn payload(&self) -> &Payload {
let msg = self.as_mut();

View File

@@ -18,6 +18,10 @@ use handler::Responder;
use header::{Header, IntoHeaderValue, ContentEncoding};
use httprequest::HttpRequest;
/// max write buffer size 64k
pub(crate) const MAX_WRITE_BUFFER_SIZE: usize = 65_536;
/// Represents various types of connection
#[derive(Copy, Clone, PartialEq, Debug)]
pub enum ConnectionType {
@@ -198,6 +202,16 @@ impl HttpResponse {
pub(crate) fn set_response_size(&mut self, size: u64) {
self.get_mut().response_size = size;
}
/// Set write buffer capacity
pub fn write_buffer_capacity(&self) -> usize {
self.get_ref().write_capacity
}
/// Set write buffer capacity
pub fn set_write_buffer_capacity(&mut self, cap: usize) {
self.get_mut().write_capacity = cap;
}
}
impl fmt::Debug for HttpResponse {
@@ -462,6 +476,20 @@ impl HttpResponseBuilder {
self
}
/// Set write buffer capacity
///
/// This parameter makes sense only for streaming response
/// or actor. If write buffer reaches specified capacity, stream or actor get
/// paused.
///
/// Default write buffer capacity is 64kb
pub fn write_buffer_capacity(&mut self, cap: usize) -> &mut Self {
if let Some(parts) = parts(&mut self.response, &self.err) {
parts.write_capacity = cap;
}
self
}
/// Set a body and generate `HttpResponse`.
///
/// `HttpResponseBuilder` can not be used after this call.
@@ -692,6 +720,7 @@ struct InnerHttpResponse {
chunked: Option<bool>,
encoding: Option<ContentEncoding>,
connection_type: Option<ConnectionType>,
write_capacity: usize,
response_size: u64,
error: Option<Error>,
}
@@ -710,6 +739,7 @@ impl InnerHttpResponse {
encoding: None,
connection_type: None,
response_size: 0,
write_capacity: MAX_WRITE_BUFFER_SIZE,
error: None,
}
}
@@ -763,6 +793,7 @@ impl Pool {
inner.connection_type = None;
inner.response_size = 0;
inner.error = None;
inner.write_capacity = MAX_WRITE_BUFFER_SIZE;
v.push_front(inner);
}
})
@@ -781,7 +812,10 @@ mod tests {
#[test]
fn test_debug() {
let resp = HttpResponse::Ok().finish().unwrap();
let resp = HttpResponse::Ok()
.header(COOKIE, HeaderValue::from_static("cookie1=value1; "))
.header(COOKIE, HeaderValue::from_static("cookie2=value2; "))
.finish().unwrap();
let dbg = format!("{:?}", resp);
assert!(dbg.contains("HttpResponse"));
}
@@ -789,8 +823,8 @@ mod tests {
#[test]
fn test_response_cookies() {
let mut headers = HeaderMap::new();
headers.insert(COOKIE,
HeaderValue::from_static("cookie1=value1; cookie2=value2"));
headers.insert(COOKIE, HeaderValue::from_static("cookie1=value1"));
headers.insert(COOKIE, HeaderValue::from_static("cookie2=value2"));
let req = HttpRequest::new(
Method::GET, Uri::from_str("/").unwrap(), Version::HTTP_11, headers, None);
@@ -813,7 +847,7 @@ mod tests {
let mut val: Vec<_> = resp.headers().get_all("Set-Cookie")
.iter().map(|v| v.to_str().unwrap().to_owned()).collect();
val.sort();
assert!(val[0].starts_with("cookie1=; Max-Age=0;"));
assert!(val[0].starts_with("cookie2=; Max-Age=0;"));
assert_eq!(
val[1],"name=value; HttpOnly; Path=/test; Domain=www.rust-lang.org; Max-Age=86400");
}

View File

@@ -2,6 +2,7 @@ use bytes::{Bytes, BytesMut};
use futures::{Poll, Future, Stream};
use http::header::CONTENT_LENGTH;
use mime;
use serde_json;
use serde::Serialize;
use serde::de::DeserializeOwned;
@@ -82,7 +83,6 @@ impl<T: Serialize> Responder for Json<T> {
/// ```
pub struct JsonBody<T, U: DeserializeOwned>{
limit: usize,
ct: &'static str,
req: Option<T>,
fut: Option<Box<Future<Item=U, Error=JsonPayloadError>>>,
}
@@ -95,7 +95,6 @@ impl<T, U: DeserializeOwned> JsonBody<T, U> {
limit: 262_144,
req: Some(req),
fut: None,
ct: "application/json",
}
}
@@ -104,15 +103,6 @@ impl<T, U: DeserializeOwned> JsonBody<T, U> {
self.limit = limit;
self
}
/// Set allowed content type.
///
/// By default *application/json* content type is used. Set content type
/// to empty string if you want to disable content type check.
pub fn content_type(mut self, ct: &'static str) -> Self {
self.ct = ct;
self
}
}
impl<T, U: DeserializeOwned + 'static> Future for JsonBody<T, U>
@@ -135,7 +125,13 @@ impl<T, U: DeserializeOwned + 'static> Future for JsonBody<T, U>
}
}
// check content-type
if !self.ct.is_empty() && req.content_type() != self.ct {
let json = if let Ok(Some(mime)) = req.mime_type() {
mime.subtype() == mime::JSON || mime.suffix() == Some(mime::JSON)
} else {
false
};
if !json {
return Err(JsonPayloadError::ContentType)
}
@@ -200,8 +196,8 @@ mod tests {
let mut req = HttpRequest::default();
req.headers_mut().insert(header::CONTENT_TYPE,
header::HeaderValue::from_static("application/json"));
let mut json = req.json::<MyObject>().content_type("text/json");
header::HeaderValue::from_static("application/text"));
let mut json = req.json::<MyObject>();
assert_eq!(json.poll().err().unwrap(), JsonPayloadError::ContentType);
let mut req = HttpRequest::default();

View File

@@ -79,6 +79,7 @@ extern crate libc;
extern crate serde;
extern crate serde_json;
extern crate flate2;
#[cfg(feature="brotli")]
extern crate brotli2;
extern crate encoding;
extern crate percent_encoding;
@@ -136,7 +137,7 @@ pub use application::Application;
pub use httpmessage::HttpMessage;
pub use httprequest::HttpRequest;
pub use httpresponse::HttpResponse;
pub use handler::{Reply, Responder, NormalizePath, AsyncResponder};
pub use handler::{Either, Reply, Responder, NormalizePath, AsyncResponder};
pub use route::Route;
pub use resource::Resource;
pub use context::HttpContext;

View File

@@ -349,8 +349,7 @@ impl<S> Middleware<S> for Cors {
if self.send_wildcard {
resp.headers_mut().insert(
header::ACCESS_CONTROL_ALLOW_ORIGIN, HeaderValue::from_static("*"));
} else {
let origin = req.headers().get(header::ORIGIN).unwrap();
} else if let Some(origin) = req.headers().get(header::ORIGIN) {
resp.headers_mut().insert(
header::ACCESS_CONTROL_ALLOW_ORIGIN, origin.clone());
}
@@ -807,6 +806,25 @@ mod tests {
assert!(cors.start(&mut req).unwrap().is_done());
}
#[test]
fn test_no_origin_response() {
let cors = Cors::build().finish().unwrap();
let mut req = TestRequest::default().method(Method::GET).finish();
let resp: HttpResponse = HttpOk.into();
let resp = cors.response(&mut req, resp).unwrap().response();
assert!(resp.headers().get(header::ACCESS_CONTROL_ALLOW_ORIGIN).is_none());
let mut req = TestRequest::with_header(
"Origin", "https://www.example.com")
.method(Method::OPTIONS)
.finish();
let resp = cors.response(&mut req, resp).unwrap().response();
assert_eq!(
&b"https://www.example.com"[..],
resp.headers().get(header::ACCESS_CONTROL_ALLOW_ORIGIN).unwrap().as_bytes());
}
#[test]
fn test_response() {
let cors = Cors::build()

View File

@@ -130,7 +130,7 @@ impl<S> InnerMultipart<S> where S: Stream<Item=Bytes, Error=PayloadError> {
fn read_headers(payload: &mut PayloadHelper<S>) -> Poll<HeaderMap, MultipartError>
{
match payload.readuntil(b"\r\n\r\n")? {
match payload.read_until(b"\r\n\r\n")? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready(None) => Err(MultipartError::Incomplete),
Async::Ready(Some(bytes)) => {
@@ -469,23 +469,23 @@ impl<S> InnerField<S> where S: Stream<Item=Bytes, Error=PayloadError> {
fn read_stream(payload: &mut PayloadHelper<S>, boundary: &str)
-> Poll<Option<Bytes>, MultipartError>
{
match payload.readuntil(b"\r")? {
match payload.read_until(b"\r")? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready(None) => Err(MultipartError::Incomplete),
Async::Ready(Some(mut chunk)) => {
if chunk.len() == 1 {
payload.unread_data(chunk);
match payload.readexactly(boundary.len() + 4)? {
match payload.read_exact(boundary.len() + 4)? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready(None) => Err(MultipartError::Incomplete),
Async::Ready(Some(chunk)) => {
if &chunk[..2] == b"\r\n" && &chunk[2..4] == b"--" &&
&chunk[4..] == boundary.as_bytes()
{
payload.unread_data(chunk.freeze());
payload.unread_data(chunk);
Ok(Async::Ready(None))
} else {
Ok(Async::Ready(Some(chunk.freeze())))
Ok(Async::Ready(Some(chunk)))
}
}
}

View File

@@ -5,9 +5,14 @@ use std::cell::RefCell;
use std::collections::VecDeque;
use bytes::{Bytes, BytesMut};
use futures::{Async, Poll, Stream};
use futures::task::{Task, current as current_task};
use error::PayloadError;
/// max buffer size 32k
pub(crate) const MAX_BUFFER_SIZE: usize = 32_768;
#[derive(Debug, PartialEq)]
pub(crate) enum PayloadStatus {
Read,
@@ -76,6 +81,14 @@ impl Payload {
pub(crate) fn readall(&self) -> Option<Bytes> {
self.inner.borrow_mut().readall()
}
#[inline]
/// Set read buffer capacity
///
/// Default buffer capacity is 32Kb.
pub fn set_read_buffer_capacity(&mut self, cap: usize) {
self.inner.borrow_mut().capacity = cap;
}
}
impl Stream for Payload {
@@ -117,18 +130,21 @@ pub struct PayloadSender {
impl PayloadWriter for PayloadSender {
#[inline]
fn set_error(&mut self, err: PayloadError) {
if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().set_error(err)
}
}
#[inline]
fn feed_eof(&mut self) {
if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().feed_eof()
}
}
#[inline]
fn feed_data(&mut self, data: Bytes) {
if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().feed_data(data)
@@ -158,6 +174,8 @@ struct Inner {
err: Option<PayloadError>,
need_read: bool,
items: VecDeque<Bytes>,
capacity: usize,
task: Option<Task>,
}
impl Inner {
@@ -169,27 +187,37 @@ impl Inner {
err: None,
items: VecDeque::new(),
need_read: true,
capacity: MAX_BUFFER_SIZE,
task: None,
}
}
#[inline]
fn set_error(&mut self, err: PayloadError) {
self.err = Some(err);
}
#[inline]
fn feed_eof(&mut self) {
self.eof = true;
}
#[inline]
fn feed_data(&mut self, data: Bytes) {
self.len += data.len();
self.need_read = false;
self.items.push_back(data);
self.need_read = self.len < self.capacity;
if let Some(task) = self.task.take() {
task.notify()
}
}
#[inline]
fn eof(&self) -> bool {
self.items.is_empty() && self.eof
}
#[inline]
fn len(&self) -> usize {
self.len
}
@@ -214,6 +242,13 @@ impl Inner {
fn readany(&mut self) -> Poll<Option<Bytes>, PayloadError> {
if let Some(data) = self.items.pop_front() {
self.len -= data.len();
self.need_read = self.len < self.capacity;
#[cfg(not(test))]
{
if self.need_read && self.task.is_none() {
self.task = Some(current_task());
}
}
Ok(Async::Ready(Some(data)))
} else if let Some(err) = self.err.take() {
Err(err)
@@ -221,6 +256,12 @@ impl Inner {
Ok(Async::Ready(None))
} else {
self.need_read = true;
#[cfg(not(test))]
{
if self.task.is_none() {
self.task = Some(current_task());
}
}
Ok(Async::NotReady)
}
}
@@ -247,6 +288,7 @@ impl<S> PayloadHelper<S> where S: Stream<Item=Bytes, Error=PayloadError> {
}
}
#[inline]
fn poll_stream(&mut self) -> Poll<bool, PayloadError> {
self.stream.poll().map(|res| {
match res {
@@ -261,6 +303,7 @@ impl<S> PayloadHelper<S> where S: Stream<Item=Bytes, Error=PayloadError> {
})
}
#[inline]
pub fn readany(&mut self) -> Poll<Option<Bytes>, PayloadError> {
if let Some(data) = self.items.pop_front() {
self.len -= data.len();
@@ -274,25 +317,85 @@ impl<S> PayloadHelper<S> where S: Stream<Item=Bytes, Error=PayloadError> {
}
}
pub fn readexactly(&mut self, size: usize) -> Poll<Option<BytesMut>, PayloadError> {
#[inline]
pub fn can_read(&mut self, size: usize) -> Poll<Option<bool>, PayloadError> {
if size <= self.len {
let mut buf = BytesMut::with_capacity(size);
while buf.len() < size {
Ok(Async::Ready(Some(true)))
} else {
match self.poll_stream()? {
Async::Ready(true) => self.can_read(size),
Async::Ready(false) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
#[inline]
pub fn get_chunk(&mut self) -> Poll<Option<&[u8]>, PayloadError> {
if self.items.is_empty() {
match self.poll_stream()? {
Async::Ready(true) => (),
Async::Ready(false) => return Ok(Async::Ready(None)),
Async::NotReady => return Ok(Async::NotReady),
}
}
match self.items.front().map(|c| c.as_ref()) {
Some(chunk) => Ok(Async::Ready(Some(chunk))),
None => Ok(Async::NotReady),
}
}
#[inline]
pub fn read_exact(&mut self, size: usize) -> Poll<Option<Bytes>, PayloadError> {
if size <= self.len {
self.len -= size;
let mut chunk = self.items.pop_front().unwrap();
if size < chunk.len() {
let buf = chunk.split_to(size);
self.items.push_front(chunk);
Ok(Async::Ready(Some(buf)))
}
else if size == chunk.len() {
Ok(Async::Ready(Some(chunk)))
}
else {
let mut buf = BytesMut::with_capacity(size);
buf.extend_from_slice(&chunk);
while buf.len() < size {
let mut chunk = self.items.pop_front().unwrap();
let rem = cmp::min(size - buf.len(), chunk.len());
buf.extend_from_slice(&chunk.split_to(rem));
if !chunk.is_empty() {
self.items.push_front(chunk);
}
}
Ok(Async::Ready(Some(buf.freeze())))
}
} else {
match self.poll_stream()? {
Async::Ready(true) => self.read_exact(size),
Async::Ready(false) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
#[inline]
pub fn drop_payload(&mut self, size: usize) {
if size <= self.len {
self.len -= size;
let mut len = 0;
while len < size {
let mut chunk = self.items.pop_front().unwrap();
let rem = cmp::min(size - buf.len(), chunk.len());
self.len -= rem;
buf.extend_from_slice(&chunk.split_to(rem));
if !chunk.is_empty() {
let rem = cmp::min(size-len, chunk.len());
len += rem;
if rem < chunk.len() {
chunk.split_to(rem);
self.items.push_front(chunk);
}
}
return Ok(Async::Ready(Some(buf)))
}
match self.poll_stream()? {
Async::Ready(true) => self.readexactly(size),
Async::Ready(false) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
@@ -317,7 +420,7 @@ impl<S> PayloadHelper<S> where S: Stream<Item=Bytes, Error=PayloadError> {
}
}
pub fn readuntil(&mut self, line: &[u8]) -> Poll<Option<Bytes>, PayloadError> {
pub fn read_until(&mut self, line: &[u8]) -> Poll<Option<Bytes>, PayloadError> {
let mut idx = 0;
let mut num = 0;
let mut offset = 0;
@@ -366,14 +469,14 @@ impl<S> PayloadHelper<S> where S: Stream<Item=Bytes, Error=PayloadError> {
}
match self.poll_stream()? {
Async::Ready(true) => self.readuntil(line),
Async::Ready(true) => self.read_until(line),
Async::Ready(false) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
pub fn readline(&mut self) -> Poll<Option<Bytes>, PayloadError> {
self.readuntil(b"\n")
self.read_until(b"\n")
}
pub fn unread_data(&mut self, data: Bytes) {
@@ -486,21 +589,21 @@ mod tests {
let (mut sender, payload) = Payload::new(false);
let mut payload = PayloadHelper::new(payload);
assert_eq!(Async::NotReady, payload.readexactly(2).ok().unwrap());
assert_eq!(Async::NotReady, payload.read_exact(2).ok().unwrap());
sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2"));
assert_eq!(Async::Ready(Some(BytesMut::from("li"))),
payload.readexactly(2).ok().unwrap());
assert_eq!(Async::Ready(Some(Bytes::from_static(b"li"))),
payload.read_exact(2).ok().unwrap());
assert_eq!(payload.len, 3);
assert_eq!(Async::Ready(Some(BytesMut::from("ne1l"))),
payload.readexactly(4).ok().unwrap());
assert_eq!(Async::Ready(Some(Bytes::from_static(b"ne1l"))),
payload.read_exact(4).ok().unwrap());
assert_eq!(payload.len, 4);
sender.set_error(PayloadError::Incomplete);
payload.readexactly(10).err().unwrap();
payload.read_exact(10).err().unwrap();
let res: Result<(), ()> = Ok(());
result(res)
@@ -513,21 +616,21 @@ mod tests {
let (mut sender, payload) = Payload::new(false);
let mut payload = PayloadHelper::new(payload);
assert_eq!(Async::NotReady, payload.readuntil(b"ne").ok().unwrap());
assert_eq!(Async::NotReady, payload.read_until(b"ne").ok().unwrap());
sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2"));
assert_eq!(Async::Ready(Some(Bytes::from("line"))),
payload.readuntil(b"ne").ok().unwrap());
payload.read_until(b"ne").ok().unwrap());
assert_eq!(payload.len, 1);
assert_eq!(Async::Ready(Some(Bytes::from("1line2"))),
payload.readuntil(b"2").ok().unwrap());
payload.read_until(b"2").ok().unwrap());
assert_eq!(payload.len, 0);
sender.set_error(PayloadError::Incomplete);
payload.readuntil(b"b").err().unwrap();
payload.read_until(b"b").err().unwrap();
let res: Result<(), ()> = Ok(());
result(res)

View File

@@ -481,11 +481,16 @@ impl<S: 'static, H> ProcessResponse<S, H> {
}
}
// always poll stream or actor for the first time
match self.resp.replace_body(Body::Empty) {
Body::Streaming(stream) =>
self.iostate = IOState::Payload(stream),
Body::Actor(ctx) =>
self.iostate = IOState::Actor(ctx),
Body::Streaming(stream) => {
self.iostate = IOState::Payload(stream);
continue
},
Body::Actor(ctx) => {
self.iostate = IOState::Actor(ctx);
continue
},
_ => (),
}

View File

@@ -1,6 +1,7 @@
use std::rc::Rc;
use std::marker::PhantomData;
use smallvec::SmallVec;
use http::{Method, StatusCode};
use pred;
@@ -34,7 +35,7 @@ use httpresponse::HttpResponse;
pub struct Resource<S=()> {
name: String,
state: PhantomData<S>,
routes: Vec<Route<S>>,
routes: SmallVec<[Route<S>; 3]>,
middlewares: Rc<Vec<Box<Middleware<S>>>>,
}
@@ -43,7 +44,7 @@ impl<S> Default for Resource<S> {
Resource {
name: String::new(),
state: PhantomData,
routes: Vec::new(),
routes: SmallVec::new(),
middlewares: Rc::new(Vec::new()) }
}
}
@@ -54,7 +55,7 @@ impl<S> Resource<S> {
Resource {
name: String::new(),
state: PhantomData,
routes: Vec::new(),
routes: SmallVec::new(),
middlewares: Rc::new(Vec::new()) }
}

View File

@@ -3,6 +3,7 @@ use std::io::{Read, Write};
use std::fmt::Write as FmtWrite;
use std::str::FromStr;
use bytes::{Bytes, BytesMut, BufMut};
use http::{Version, Method, HttpTryFrom};
use http::header::{HeaderMap, HeaderValue,
ACCEPT_ENCODING, CONNECTION,
@@ -10,8 +11,8 @@ use http::header::{HeaderMap, HeaderValue,
use flate2::Compression;
use flate2::read::GzDecoder;
use flate2::write::{GzEncoder, DeflateDecoder, DeflateEncoder};
#[cfg(feature="brotli")]
use brotli2::write::{BrotliDecoder, BrotliEncoder};
use bytes::{Bytes, BytesMut, BufMut};
use header::ContentEncoding;
use body::{Body, Binary};
@@ -144,6 +145,7 @@ impl PayloadWriter for EncodedPayload {
pub(crate) enum Decoder {
Deflate(Box<DeflateDecoder<Writer>>),
Gzip(Option<Box<GzDecoder<Wrapper>>>),
#[cfg(feature="brotli")]
Br(Box<BrotliDecoder<Writer>>),
Identity,
}
@@ -214,6 +216,7 @@ pub(crate) struct PayloadStream {
impl PayloadStream {
pub fn new(enc: ContentEncoding) -> PayloadStream {
let dec = match enc {
#[cfg(feature="brotli")]
ContentEncoding::Br => Decoder::Br(
Box::new(BrotliDecoder::new(Writer::new()))),
ContentEncoding::Deflate => Decoder::Deflate(
@@ -229,6 +232,7 @@ impl PayloadStream {
pub fn feed_eof(&mut self) -> io::Result<Option<Bytes>> {
match self.decoder {
#[cfg(feature="brotli")]
Decoder::Br(ref mut decoder) => {
match decoder.finish() {
Ok(mut writer) => {
@@ -278,6 +282,7 @@ impl PayloadStream {
pub fn feed_data(&mut self, data: Bytes) -> io::Result<Option<Bytes>> {
match self.decoder {
#[cfg(feature="brotli")]
Decoder::Br(ref mut decoder) => {
match decoder.write_all(&data) {
Ok(_) => {
@@ -346,6 +351,7 @@ impl PayloadStream {
pub(crate) enum ContentEncoder {
Deflate(DeflateEncoder<TransferEncoding>),
Gzip(GzEncoder<TransferEncoding>),
#[cfg(feature="brotli")]
Br(BrotliEncoder<TransferEncoding>),
Identity(TransferEncoding),
}
@@ -412,6 +418,7 @@ impl ContentEncoder {
DeflateEncoder::new(transfer, Compression::default())),
ContentEncoding::Gzip => ContentEncoder::Gzip(
GzEncoder::new(transfer, Compression::default())),
#[cfg(feature="brotli")]
ContentEncoding::Br => ContentEncoder::Br(
BrotliEncoder::new(transfer, 5)),
ContentEncoding::Identity => ContentEncoder::Identity(transfer),
@@ -464,10 +471,11 @@ impl ContentEncoder {
DeflateEncoder::new(transfer, Compression::default())),
ContentEncoding::Gzip => ContentEncoder::Gzip(
GzEncoder::new(transfer, Compression::default())),
#[cfg(feature="brotli")]
ContentEncoding::Br => ContentEncoder::Br(
BrotliEncoder::new(transfer, 5)),
ContentEncoding::Identity => ContentEncoder::Identity(transfer),
ContentEncoding::Auto => unreachable!()
ContentEncoding::Identity | ContentEncoding::Auto =>
ContentEncoder::Identity(transfer),
}
}
@@ -538,6 +546,7 @@ impl ContentEncoder {
#[inline]
pub fn is_eof(&self) -> bool {
match *self {
#[cfg(feature="brotli")]
ContentEncoder::Br(ref encoder) => encoder.get_ref().is_eof(),
ContentEncoder::Deflate(ref encoder) => encoder.get_ref().is_eof(),
ContentEncoder::Gzip(ref encoder) => encoder.get_ref().is_eof(),
@@ -552,6 +561,7 @@ impl ContentEncoder {
self, ContentEncoder::Identity(TransferEncoding::eof(SharedBytes::empty())));
match encoder {
#[cfg(feature="brotli")]
ContentEncoder::Br(encoder) => {
match encoder.finish() {
Ok(mut writer) => {
@@ -594,6 +604,7 @@ impl ContentEncoder {
#[inline(always)]
pub fn write(&mut self, data: Binary) -> Result<(), io::Error> {
match *self {
#[cfg(feature="brotli")]
ContentEncoder::Br(ref mut encoder) => {
match encoder.write_all(data.as_ref()) {
Ok(_) => Ok(()),

View File

@@ -32,8 +32,10 @@ const MAX_PIPELINED_MESSAGES: usize = 16;
bitflags! {
struct Flags: u8 {
const STARTED = 0b0000_0001;
const ERROR = 0b0000_0010;
const KEEPALIVE = 0b0000_0100;
const SHUTDOWN = 0b0000_1000;
}
}
@@ -94,34 +96,37 @@ impl<T, H> Http1<T, H>
match timer.poll() {
Ok(Async::Ready(_)) => {
trace!("Keep-alive timeout, close connection");
return Ok(Async::Ready(()))
self.flags.insert(Flags::SHUTDOWN);
}
Ok(Async::NotReady) => (),
Err(_) => unreachable!(),
}
}
// shutdown
if self.flags.contains(Flags::SHUTDOWN) {
match self.stream.poll_completed(true) {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(_)) => return Ok(Async::Ready(())),
Err(err) => {
debug!("Error sending data: {}", err);
return Err(())
}
}
}
loop {
match self.poll_io()? {
Async::Ready(true) => (),
Async::Ready(false) => return Ok(Async::Ready(())),
Async::Ready(false) => {
self.flags.insert(Flags::SHUTDOWN);
return self.poll()
},
Async::NotReady => return Ok(Async::NotReady),
}
}
}
fn poll_completed(&mut self, shutdown: bool) -> Result<bool, ()> {
// check stream state
match self.stream.poll_completed(shutdown) {
Ok(Async::Ready(_)) => Ok(true),
Ok(Async::NotReady) => Ok(false),
Err(err) => {
debug!("Error sending data: {}", err);
Err(())
}
}
}
// TODO: refactor
pub fn poll_io(&mut self) -> Poll<bool, ()> {
// read incoming data
@@ -132,6 +137,8 @@ impl<T, H> Http1<T, H>
match self.reader.parse(self.stream.get_mut(),
&mut self.read_buf, &self.settings) {
Ok(Async::Ready(mut req)) => {
self.flags.insert(Flags::STARTED);
// set remote addr
req.set_peer_addr(self.addr);
@@ -192,134 +199,120 @@ impl<T, H> Http1<T, H>
let retry = self.reader.need_read() == PayloadStatus::Read;
loop {
// check in-flight messages
let mut io = false;
let mut idx = 0;
while idx < self.tasks.len() {
let item = &mut self.tasks[idx];
// check in-flight messages
let mut io = false;
let mut idx = 0;
while idx < self.tasks.len() {
let item = &mut self.tasks[idx];
if !io && !item.flags.contains(EntryFlags::EOF) {
// io is corrupted, send buffer
if item.flags.contains(EntryFlags::ERROR) {
if !io && !item.flags.contains(EntryFlags::EOF) {
// io is corrupted, send buffer
if item.flags.contains(EntryFlags::ERROR) {
if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
return Ok(Async::NotReady)
}
return Err(())
}
match item.pipe.poll_io(&mut self.stream) {
Ok(Async::Ready(ready)) => {
// override keep-alive state
if self.stream.keepalive() {
self.flags.insert(Flags::KEEPALIVE);
} else {
self.flags.remove(Flags::KEEPALIVE);
}
// prepare stream for next response
self.stream.reset();
if ready {
item.flags.insert(EntryFlags::EOF | EntryFlags::FINISHED);
} else {
item.flags.insert(EntryFlags::FINISHED);
}
},
// no more IO for this iteration
Ok(Async::NotReady) => {
if self.reader.need_read() == PayloadStatus::Read && !retry {
return Ok(Async::Ready(true));
}
io = true;
}
Err(err) => {
// it is not possible to recover from error
// during pipe handling, so just drop connection
error!("Unhandled error: {}", err);
item.flags.insert(EntryFlags::ERROR);
// check stream state, we still can have valid data in buffer
if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
return Ok(Async::NotReady)
}
return Err(())
}
match item.pipe.poll_io(&mut self.stream) {
Ok(Async::Ready(ready)) => {
// override keep-alive state
if self.stream.keepalive() {
self.flags.insert(Flags::KEEPALIVE);
} else {
self.flags.remove(Flags::KEEPALIVE);
}
// prepare stream for next response
self.stream.reset();
if ready {
item.flags.insert(EntryFlags::EOF | EntryFlags::FINISHED);
} else {
item.flags.insert(EntryFlags::FINISHED);
}
},
// no more IO for this iteration
Ok(Async::NotReady) => {
if self.reader.need_read() == PayloadStatus::Read && !retry {
return Ok(Async::Ready(true));
}
io = true;
}
Err(err) => {
// it is not possible to recover from error
// during pipe handling, so just drop connection
error!("Unhandled error: {}", err);
item.flags.insert(EntryFlags::ERROR);
// check stream state, we still can have valid data in buffer
if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
return Ok(Async::NotReady)
}
return Err(())
}
}
} else if !item.flags.contains(EntryFlags::FINISHED) {
match item.pipe.poll() {
Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) => item.flags.insert(EntryFlags::FINISHED),
Err(err) => {
item.flags.insert(EntryFlags::ERROR);
error!("Unhandled error: {}", err);
}
}
} else if !item.flags.contains(EntryFlags::FINISHED) {
match item.pipe.poll() {
Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) => item.flags.insert(EntryFlags::FINISHED),
Err(err) => {
item.flags.insert(EntryFlags::ERROR);
error!("Unhandled error: {}", err);
}
}
idx += 1;
}
idx += 1;
}
// cleanup finished tasks
let mut popped = false;
while !self.tasks.is_empty() {
if self.tasks[0].flags.contains(EntryFlags::EOF | EntryFlags::FINISHED) {
popped = true;
self.tasks.pop_front();
} else {
break
}
}
if need_read && popped {
return self.poll_io()
// cleanup finished tasks
let mut popped = false;
while !self.tasks.is_empty() {
if self.tasks[0].flags.contains(EntryFlags::EOF | EntryFlags::FINISHED) {
popped = true;
self.tasks.pop_front();
} else {
break
}
}
if need_read && popped {
return self.poll_io()
}
// no keep-alive
if !self.flags.contains(Flags::KEEPALIVE) && self.tasks.is_empty() {
// check stream state
if !self.poll_completed(true)? {
return Ok(Async::NotReady)
// check stream state
if self.flags.contains(Flags::STARTED) {
match self.stream.poll_completed(false) {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => {
debug!("Error sending data: {}", err);
return Err(())
}
_ => (),
}
}
// deal with keep-alive
if self.tasks.is_empty() {
// no keep-alive situations
if (self.flags.contains(Flags::ERROR)
|| !self.flags.contains(Flags::KEEPALIVE)
|| !self.settings.keep_alive_enabled()) &&
self.flags.contains(Flags::STARTED)
{
return Ok(Async::Ready(false))
}
// start keep-alive timer, this also is slow request timeout
if self.tasks.is_empty() {
// check stream state
if self.flags.contains(Flags::ERROR) {
return Ok(Async::Ready(false))
}
if self.settings.keep_alive_enabled() {
let keep_alive = self.settings.keep_alive();
if keep_alive > 0 && self.flags.contains(Flags::KEEPALIVE) {
if self.keepalive_timer.is_none() {
trace!("Start keep-alive timer");
let mut to = Timeout::new(
Duration::new(keep_alive, 0), Arbiter::handle()).unwrap();
// register timeout
let _ = to.poll();
self.keepalive_timer = Some(to);
}
} else {
// check stream state
if !self.poll_completed(true)? {
return Ok(Async::NotReady)
}
// keep-alive is disabled, drop connection
return Ok(Async::Ready(false))
}
} else if !self.poll_completed(false)? ||
self.flags.contains(Flags::KEEPALIVE) {
// check stream state or
// if keep-alive unset, rely on operating system
return Ok(Async::NotReady)
} else {
return Ok(Async::Ready(false))
}
} else {
self.poll_completed(false)?;
return Ok(Async::NotReady)
// start keep-alive timer
let keep_alive = self.settings.keep_alive();
if self.keepalive_timer.is_none() && keep_alive > 0 {
trace!("Start keep-alive timer");
let mut timer = Timeout::new(
Duration::new(keep_alive, 0), Arbiter::handle()).unwrap();
// register timer
let _ = timer.poll();
self.keepalive_timer = Some(timer);
}
}
Ok(Async::NotReady)
}
}
@@ -868,7 +861,7 @@ mod tests {
use httpmessage::HttpMessage;
use application::HttpApplication;
use server::settings::WorkerSettings;
use server::IoStream;
use server::{IoStream, KeepAlive};
struct Buffer {
buf: Bytes,
@@ -939,7 +932,8 @@ mod tests {
macro_rules! parse_ready {
($e:expr) => ({
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
match Reader::new().parse($e, &mut BytesMut::new(), &settings) {
Ok(Async::Ready(req)) => req,
Ok(_) => panic!("Eof during parsing http request"),
@@ -961,7 +955,8 @@ mod tests {
macro_rules! expect_parse_err {
($e:expr) => ({
let mut buf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
match Reader::new().parse($e, &mut buf, &settings) {
Err(err) => match err {
@@ -979,7 +974,8 @@ mod tests {
fn test_parse() {
let mut buf = Buffer::new("GET /test HTTP/1.1\r\n\r\n");
let mut readbuf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
let mut reader = Reader::new();
match reader.parse(&mut buf, &mut readbuf, &settings) {
@@ -996,7 +992,8 @@ mod tests {
fn test_parse_partial() {
let mut buf = Buffer::new("PUT /test HTTP/1");
let mut readbuf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
let mut reader = Reader::new();
match reader.parse(&mut buf, &mut readbuf, &settings) {
@@ -1019,7 +1016,8 @@ mod tests {
fn test_parse_post() {
let mut buf = Buffer::new("POST /test2 HTTP/1.0\r\n\r\n");
let mut readbuf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
let mut reader = Reader::new();
match reader.parse(&mut buf, &mut readbuf, &settings) {
@@ -1036,7 +1034,8 @@ mod tests {
fn test_parse_body() {
let mut buf = Buffer::new("GET /test HTTP/1.1\r\nContent-Length: 4\r\n\r\nbody");
let mut readbuf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
let mut reader = Reader::new();
match reader.parse(&mut buf, &mut readbuf, &settings) {
@@ -1055,7 +1054,8 @@ mod tests {
let mut buf = Buffer::new(
"\r\nGET /test HTTP/1.1\r\nContent-Length: 4\r\n\r\nbody");
let mut readbuf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
let mut reader = Reader::new();
match reader.parse(&mut buf, &mut readbuf, &settings) {
@@ -1073,7 +1073,8 @@ mod tests {
fn test_parse_partial_eof() {
let mut buf = Buffer::new("GET /test HTTP/1.1\r\n");
let mut readbuf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
let mut reader = Reader::new();
not_ready!{ reader.parse(&mut buf, &mut readbuf, &settings) }
@@ -1093,7 +1094,8 @@ mod tests {
fn test_headers_split_field() {
let mut buf = Buffer::new("GET /test HTTP/1.1\r\n");
let mut readbuf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
let mut reader = Reader::new();
not_ready!{ reader.parse(&mut buf, &mut readbuf, &settings) }
@@ -1123,7 +1125,8 @@ mod tests {
Set-Cookie: c1=cookie1\r\n\
Set-Cookie: c2=cookie2\r\n\r\n");
let mut readbuf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
let mut reader = Reader::new();
match reader.parse(&mut buf, &mut readbuf, &settings) {
@@ -1358,7 +1361,8 @@ mod tests {
"GET /test HTTP/1.1\r\n\
transfer-encoding: chunked\r\n\r\n");
let mut readbuf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
let mut reader = Reader::new();
let mut req = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings));
@@ -1379,7 +1383,8 @@ mod tests {
"GET /test HTTP/1.1\r\n\
transfer-encoding: chunked\r\n\r\n");
let mut readbuf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
let mut reader = Reader::new();
@@ -1408,10 +1413,12 @@ mod tests {
"GET /test HTTP/1.1\r\n\
transfer-encoding: chunked\r\n\r\n");
let mut readbuf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
let mut reader = Reader::new();
let mut req = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings));
let _ = req.payload_mut().set_read_buffer_capacity(0);
assert!(req.chunked().unwrap());
assert!(!req.payload().eof());
@@ -1458,7 +1465,8 @@ mod tests {
"GET /test HTTP/1.1\r\n\
transfer-encoding: chunked\r\n\r\n");
let mut readbuf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
let mut reader = Reader::new();
let mut req = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings));

View File

@@ -34,6 +34,7 @@ pub(crate) struct H1Writer<T: AsyncWrite> {
written: u64,
headers_size: u32,
buffer: SharedBytes,
buffer_capacity: usize,
}
impl<T: AsyncWrite> H1Writer<T> {
@@ -45,6 +46,7 @@ impl<T: AsyncWrite> H1Writer<T> {
written: 0,
headers_size: 0,
buffer: buf,
buffer_capacity: 0,
stream,
}
}
@@ -66,27 +68,22 @@ impl<T: AsyncWrite> H1Writer<T> {
self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE)
}
fn write_to_stream(&mut self) -> io::Result<WriterState> {
while !self.buffer.is_empty() {
match self.stream.write(self.buffer.as_ref()) {
fn write_data(&mut self, data: &[u8]) -> io::Result<usize> {
let mut written = 0;
while written < data.len() {
match self.stream.write(&data[written..]) {
Ok(0) => {
self.disconnected();
return Ok(WriterState::Done);
},
Ok(n) => {
let _ = self.buffer.split_to(n);
return Err(io::Error::new(io::ErrorKind::WriteZero, ""))
},
Ok(n) => written += n,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
return Ok(WriterState::Pause)
} else {
return Ok(WriterState::Done)
}
return Ok(written)
}
Err(err) => return Err(err),
}
}
Ok(WriterState::Done)
Ok(written)
}
}
@@ -199,6 +196,9 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
self.written = bytes.len() as u64;
self.encoder.write(bytes)?;
} else {
// capacity, makes sense only for streaming or actor
self.buffer_capacity = msg.write_buffer_capacity();
msg.replace_body(body);
}
Ok(WriterState::Done)
@@ -211,18 +211,10 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
// shortcut for upgraded connection
if self.flags.contains(Flags::UPGRADE) {
if self.buffer.is_empty() {
match self.stream.write(payload.as_ref()) {
Ok(0) => {
self.disconnected();
return Ok(WriterState::Done);
},
Ok(n) => if payload.len() < n {
self.buffer.extend_from_slice(&payload.as_ref()[n..])
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
return Ok(WriterState::Done)
}
Err(err) => return Err(err),
let n = self.write_data(payload.as_ref())?;
if payload.len() < n {
self.buffer.extend_from_slice(&payload.as_ref()[n..]);
return Ok(WriterState::Done);
}
} else {
self.buffer.extend(payload);
@@ -237,7 +229,7 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
}
}
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
if self.buffer.len() > self.buffer_capacity {
Ok(WriterState::Pause)
} else {
Ok(WriterState::Done)
@@ -259,16 +251,18 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
#[inline]
fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error> {
match self.write_to_stream() {
Ok(WriterState::Done) => {
if shutdown {
self.stream.shutdown()
} else {
Ok(Async::Ready(()))
}
},
Ok(WriterState::Pause) => Ok(Async::NotReady),
Err(err) => Err(err)
if !self.buffer.is_empty() {
let buf: &[u8] = unsafe{mem::transmute(self.buffer.as_ref())};
let written = self.write_data(buf)?;
let _ = self.buffer.split_to(written);
if self.buffer.len() > self.buffer_capacity {
return Ok(Async::NotReady)
}
}
if shutdown {
self.stream.shutdown()
} else {
Ok(Async::Ready(()))
}
}
}

View File

@@ -26,7 +26,7 @@ use payload::{Payload, PayloadWriter, PayloadStatus};
use super::h2writer::H2Writer;
use super::encoding::PayloadType;
use super::settings::WorkerSettings;
use super::{HttpHandler, HttpHandlerTask};
use super::{HttpHandler, HttpHandlerTask, Writer};
bitflags! {
struct Flags: u8 {
@@ -109,22 +109,27 @@ impl<T, H> Http2<T, H>
loop {
match item.task.poll_io(&mut item.stream) {
Ok(Async::Ready(ready)) => {
item.flags.insert(EntryFlags::EOF);
if ready {
item.flags.insert(EntryFlags::FINISHED);
item.flags.insert(
EntryFlags::EOF | EntryFlags::FINISHED);
} else {
item.flags.insert(EntryFlags::EOF);
}
not_ready = false;
},
Ok(Async::NotReady) => {
if item.payload.need_read() == PayloadStatus::Read && !retry
if item.payload.need_read() == PayloadStatus::Read
&& !retry
{
continue
}
},
Err(err) => {
error!("Unhandled error: {}", err);
item.flags.insert(EntryFlags::EOF);
item.flags.insert(EntryFlags::ERROR);
item.flags.insert(
EntryFlags::EOF |
EntryFlags::ERROR |
EntryFlags::WRITE_DONE);
item.stream.reset(Reason::INTERNAL_ERROR);
}
}
@@ -138,18 +143,32 @@ impl<T, H> Http2<T, H>
item.flags.insert(EntryFlags::FINISHED);
},
Err(err) => {
item.flags.insert(EntryFlags::ERROR);
item.flags.insert(EntryFlags::FINISHED);
item.flags.insert(
EntryFlags::ERROR | EntryFlags::WRITE_DONE |
EntryFlags::FINISHED);
error!("Unhandled error: {}", err);
}
}
}
if !item.flags.contains(EntryFlags::WRITE_DONE) {
match item.stream.poll_completed(false) {
Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) => {
not_ready = false;
item.flags.insert(EntryFlags::WRITE_DONE);
}
Err(_err) => {
item.flags.insert(EntryFlags::ERROR);
}
}
}
}
// cleanup finished tasks
while !self.tasks.is_empty() {
if self.tasks[0].flags.contains(EntryFlags::EOF) &&
self.tasks[0].flags.contains(EntryFlags::FINISHED) ||
self.tasks[0].flags.contains(EntryFlags::WRITE_DONE) ||
self.tasks[0].flags.contains(EntryFlags::ERROR)
{
self.tasks.pop_front();
@@ -251,6 +270,7 @@ bitflags! {
const REOF = 0b0000_0010;
const ERROR = 0b0000_0100;
const FINISHED = 0b0000_1000;
const WRITE_DONE = 0b0001_0000;
}
}

View File

@@ -24,6 +24,7 @@ bitflags! {
const STARTED = 0b0000_0001;
const DISCONNECTED = 0b0000_0010;
const EOF = 0b0000_0100;
const RESERVED = 0b0000_1000;
}
}
@@ -34,6 +35,7 @@ pub(crate) struct H2Writer {
flags: Flags,
written: u64,
buffer: SharedBytes,
buffer_capacity: usize,
}
impl H2Writer {
@@ -46,6 +48,7 @@ impl H2Writer {
flags: Flags::empty(),
written: 0,
buffer: buf,
buffer_capacity: 0,
}
}
@@ -54,55 +57,6 @@ impl H2Writer {
stream.send_reset(reason)
}
}
fn write_to_stream(&mut self) -> io::Result<WriterState> {
if !self.flags.contains(Flags::STARTED) {
return Ok(WriterState::Done)
}
if let Some(ref mut stream) = self.stream {
if self.buffer.is_empty() {
if self.flags.contains(Flags::EOF) {
let _ = stream.send_data(Bytes::new(), true);
}
return Ok(WriterState::Done)
}
loop {
match stream.poll_capacity() {
Ok(Async::NotReady) => {
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
return Ok(WriterState::Pause)
} else {
return Ok(WriterState::Done)
}
}
Ok(Async::Ready(None)) => {
return Ok(WriterState::Done)
}
Ok(Async::Ready(Some(cap))) => {
let len = self.buffer.len();
let bytes = self.buffer.split_to(cmp::min(cap, len));
let eof = self.buffer.is_empty() && self.flags.contains(Flags::EOF);
self.written += bytes.len() as u64;
if let Err(err) = stream.send_data(bytes.freeze(), eof) {
return Err(io::Error::new(io::ErrorKind::Other, err))
} else if !self.buffer.is_empty() {
let cap = cmp::min(self.buffer.len(), CHUNK_SIZE);
stream.reserve_capacity(cap);
} else {
return Ok(WriterState::Pause)
}
}
Err(_) => {
return Err(io::Error::new(io::ErrorKind::Other, ""))
}
}
}
}
Ok(WriterState::Done)
}
}
impl Writer for H2Writer {
@@ -111,8 +65,11 @@ impl Writer for H2Writer {
self.written
}
fn start(&mut self, req: &mut HttpInnerMessage, msg: &mut HttpResponse, encoding: ContentEncoding)
-> io::Result<WriterState> {
fn start(&mut self,
req: &mut HttpInnerMessage,
msg: &mut HttpResponse,
encoding: ContentEncoding) -> io::Result<WriterState>
{
// prepare response
self.flags.insert(Flags::STARTED);
self.encoder = ContentEncoder::for_server(self.buffer.clone(), req, msg, encoding);
@@ -167,11 +124,13 @@ impl Writer for H2Writer {
self.written = bytes.len() as u64;
self.encoder.write(bytes)?;
if let Some(ref mut stream) = self.stream {
self.flags.insert(Flags::RESERVED);
stream.reserve_capacity(cmp::min(self.buffer.len(), CHUNK_SIZE));
}
Ok(WriterState::Pause)
} else {
msg.replace_body(body);
self.buffer_capacity = msg.write_buffer_capacity();
Ok(WriterState::Done)
}
}
@@ -189,7 +148,7 @@ impl Writer for H2Writer {
}
}
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
if self.buffer.len() > self.buffer_capacity {
Ok(WriterState::Pause)
} else {
Ok(WriterState::Done)
@@ -211,10 +170,41 @@ impl Writer for H2Writer {
}
fn poll_completed(&mut self, _shutdown: bool) -> Poll<(), io::Error> {
match self.write_to_stream() {
Ok(WriterState::Done) => Ok(Async::Ready(())),
Ok(WriterState::Pause) => Ok(Async::NotReady),
Err(err) => Err(err)
if !self.flags.contains(Flags::STARTED) {
return Ok(Async::NotReady);
}
if let Some(ref mut stream) = self.stream {
// reserve capacity
if !self.flags.contains(Flags::RESERVED) && !self.buffer.is_empty() {
self.flags.insert(Flags::RESERVED);
stream.reserve_capacity(cmp::min(self.buffer.len(), CHUNK_SIZE));
}
loop {
match stream.poll_capacity() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
Ok(Async::Ready(Some(cap))) => {
let len = self.buffer.len();
let bytes = self.buffer.split_to(cmp::min(cap, len));
let eof = self.buffer.is_empty() && self.flags.contains(Flags::EOF);
self.written += bytes.len() as u64;
if let Err(e) = stream.send_data(bytes.freeze(), eof) {
return Err(io::Error::new(io::ErrorKind::Other, e))
} else if !self.buffer.is_empty() {
let cap = cmp::min(self.buffer.len(), CHUNK_SIZE);
stream.reserve_capacity(cap);
} else {
self.flags.remove(Flags::RESERVED);
return Ok(Async::NotReady)
}
}
Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e)),
}
}
}
Ok(Async::NotReady)
}
}

View File

@@ -31,6 +31,35 @@ use httpresponse::HttpResponse;
/// max buffer size 64k
pub(crate) const MAX_WRITE_BUFFER_SIZE: usize = 65_536;
#[derive(Debug, PartialEq, Clone, Copy)]
/// Server keep-alive setting
pub enum KeepAlive {
/// Keep alive in seconds
Timeout(usize),
/// Use `SO_KEEPALIVE` socket option, value in seconds
Tcp(usize),
/// Relay on OS to shutdown tcp connection
Os,
/// Disabled
Disabled,
}
impl From<usize> for KeepAlive {
fn from(keepalive: usize) -> Self {
KeepAlive::Timeout(keepalive)
}
}
impl From<Option<usize>> for KeepAlive {
fn from(keepalive: Option<usize>) -> Self {
if let Some(keepalive) = keepalive {
KeepAlive::Timeout(keepalive)
} else {
KeepAlive::Disabled
}
}
}
/// Pause accepting incoming connections
///
/// If socket contains some pending connection, they might be dropped.

View File

@@ -5,6 +5,7 @@ use std::cell::{Cell, RefCell, RefMut, UnsafeCell};
use futures_cpupool::{Builder, CpuPool};
use helpers;
use super::KeepAlive;
use super::channel::Node;
use super::shared::{SharedBytes, SharedBytesPool};
@@ -97,8 +98,8 @@ impl ServerSettings {
pub(crate) struct WorkerSettings<H> {
h: RefCell<Vec<H>>,
enabled: bool,
keep_alive: u64,
ka_enabled: bool,
bytes: Rc<SharedBytesPool>,
messages: Rc<helpers::SharedMessagePool>,
channels: Cell<usize>,
@@ -106,11 +107,16 @@ pub(crate) struct WorkerSettings<H> {
}
impl<H> WorkerSettings<H> {
pub(crate) fn new(h: Vec<H>, keep_alive: Option<u64>) -> WorkerSettings<H> {
pub(crate) fn new(h: Vec<H>, keep_alive: KeepAlive) -> WorkerSettings<H> {
let (keep_alive, ka_enabled) = match keep_alive {
KeepAlive::Timeout(val) => (val as u64, true),
KeepAlive::Os | KeepAlive::Tcp(_) => (0, true),
KeepAlive::Disabled => (0, false),
};
WorkerSettings {
keep_alive, ka_enabled,
h: RefCell::new(h),
enabled: if let Some(ka) = keep_alive { ka > 0 } else { false },
keep_alive: keep_alive.unwrap_or(0),
bytes: Rc::new(SharedBytesPool::new()),
messages: Rc::new(helpers::SharedMessagePool::new()),
channels: Cell::new(0),
@@ -135,7 +141,7 @@ impl<H> WorkerSettings<H> {
}
pub fn keep_alive_enabled(&self) -> bool {
self.enabled
self.ka_enabled
}
pub fn get_shared_bytes(&self) -> SharedBytes {

View File

@@ -27,7 +27,7 @@ impl SharedBytesPool {
pub fn release_bytes(&self, mut bytes: Rc<BytesMut>) {
let v = &mut self.0.borrow_mut();
if v.len() < 128 {
Rc::get_mut(&mut bytes).unwrap().take();
Rc::get_mut(&mut bytes).unwrap().clear();
v.push_front(bytes);
}
}
@@ -62,7 +62,7 @@ impl SharedBytes {
#[inline(always)]
#[allow(mutable_transmutes)]
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))]
pub fn get_mut(&self) -> &mut BytesMut {
pub(crate) fn get_mut(&self) -> &mut BytesMut {
let r: &BytesMut = self.0.as_ref().unwrap().as_ref();
unsafe{mem::transmute(r)}
}

View File

@@ -2,7 +2,6 @@ use std::{io, net, thread};
use std::rc::Rc;
use std::sync::{Arc, mpsc as sync_mpsc};
use std::time::Duration;
use std::collections::HashMap;
use actix::prelude::*;
use actix::actors::signal;
@@ -20,13 +19,12 @@ use native_tls::TlsAcceptor;
use openssl::ssl::{AlpnError, SslAcceptorBuilder};
use helpers;
use super::{IntoHttpHandler, IoStream};
use super::{IntoHttpHandler, IoStream, KeepAlive};
use super::{PauseServer, ResumeServer, StopServer};
use super::channel::{HttpChannel, WrapperStream};
use super::worker::{Conn, Worker, StreamHandlerType, StopWorker};
use super::settings::{ServerSettings, WorkerSettings};
/// An HTTP Server
pub struct HttpServer<H> where H: IntoHttpHandler + 'static
{
@@ -34,15 +32,16 @@ pub struct HttpServer<H> where H: IntoHttpHandler + 'static
threads: usize,
backlog: i32,
host: Option<String>,
keep_alive: Option<u64>,
keep_alive: KeepAlive,
factory: Arc<Fn() -> Vec<H> + Send + Sync>,
#[cfg_attr(feature="cargo-clippy", allow(type_complexity))]
workers: Vec<(usize, Addr<Syn, Worker<H::Handler>>)>,
sockets: HashMap<net::SocketAddr, net::TcpListener>,
sockets: Vec<(net::SocketAddr, net::TcpListener)>,
accept: Vec<(mio::SetReadiness, sync_mpsc::Sender<Command>)>,
exit: bool,
shutdown_timeout: u16,
signals: Option<Addr<Syn, signal::ProcessSignals>>,
no_http2: bool,
no_signals: bool,
}
@@ -78,19 +77,20 @@ impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
let f = move || {
(factory)().into_iter().collect()
};
HttpServer{ h: None,
threads: num_cpus::get(),
backlog: 2048,
host: None,
keep_alive: None,
keep_alive: KeepAlive::Os,
factory: Arc::new(f),
workers: Vec::new(),
sockets: HashMap::new(),
sockets: Vec::new(),
accept: Vec::new(),
exit: false,
shutdown_timeout: 30,
signals: None,
no_http2: false,
no_signals: false,
}
}
@@ -124,15 +124,9 @@ impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
/// Set server keep-alive setting.
///
/// By default keep alive is enabled.
///
/// - `Some(75)` - enable
///
/// - `Some(0)` - disable
///
/// - `None` - use `SO_KEEPALIVE` socket option
pub fn keep_alive(mut self, val: Option<u64>) -> Self {
self.keep_alive = val;
/// By default keep alive is set to a `Os`.
pub fn keep_alive<T: Into<KeepAlive>>(mut self, val: T) -> Self {
self.keep_alive = val.into();
self
}
@@ -178,9 +172,15 @@ impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
self
}
/// Disable `HTTP/2` support
pub fn no_http2(mut self) -> Self {
self.no_http2 = true;
self
}
/// Get addresses of bound sockets.
pub fn addrs(&self) -> Vec<net::SocketAddr> {
self.sockets.keys().cloned().collect()
self.sockets.iter().map(|s| s.0).collect()
}
/// Use listener for accepting incoming connection requests
@@ -188,7 +188,7 @@ impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
/// HttpServer does not change any configuration for TcpListener,
/// it needs to be configured before passing it to listen() method.
pub fn listen(mut self, lst: net::TcpListener) -> Self {
self.sockets.insert(lst.local_addr().unwrap(), lst);
self.sockets.push((lst.local_addr().unwrap(), lst));
self
}
@@ -202,7 +202,7 @@ impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
match create_tcp_listener(addr, self.backlog) {
Ok(lst) => {
succ = true;
self.sockets.insert(lst.local_addr().unwrap(), lst);
self.sockets.push((lst.local_addr().unwrap(), lst));
},
Err(e) => err = Some(e),
}
@@ -295,7 +295,7 @@ impl<H: IntoHttpHandler> HttpServer<H>
} else {
let (tx, rx) = mpsc::unbounded();
let addrs: Vec<(net::SocketAddr, net::TcpListener)> =
self.sockets.drain().collect();
self.sockets.drain(..).collect();
let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false);
let workers = self.start_workers(&settings, &StreamHandlerType::Normal);
let info = Info{addr: addrs[0].0, handler: StreamHandlerType::Normal};
@@ -364,7 +364,7 @@ impl<H: IntoHttpHandler> HttpServer<H>
Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound"))
} else {
let (tx, rx) = mpsc::unbounded();
let addrs: Vec<(net::SocketAddr, net::TcpListener)> = self.sockets.drain().collect();
let addrs: Vec<(net::SocketAddr, net::TcpListener)> = self.sockets.drain(..).collect();
let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false);
let workers = self.start_workers(
&settings, &StreamHandlerType::Tls(acceptor.clone()));
@@ -404,19 +404,21 @@ impl<H: IntoHttpHandler> HttpServer<H>
Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound"))
} else {
// alpn support
builder.set_alpn_protos(b"\x02h2\x08http/1.1")?;
builder.set_alpn_select_callback(|_, protos| {
const H2: &[u8] = b"\x02h2";
if protos.windows(3).any(|window| window == H2) {
Ok(b"h2")
} else {
Err(AlpnError::NOACK)
}
});
if !self.no_http2 {
builder.set_alpn_protos(b"\x02h2\x08http/1.1")?;
builder.set_alpn_select_callback(|_, protos| {
const H2: &[u8] = b"\x02h2";
if protos.windows(3).any(|window| window == H2) {
Ok(b"h2")
} else {
Err(AlpnError::NOACK)
}
});
}
let (tx, rx) = mpsc::unbounded();
let acceptor = builder.build();
let addrs: Vec<(net::SocketAddr, net::TcpListener)> = self.sockets.drain().collect();
let addrs: Vec<(net::SocketAddr, net::TcpListener)> = self.sockets.drain(..).collect();
let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false);
let workers = self.start_workers(
&settings, &StreamHandlerType::Alpn(acceptor.clone()));
@@ -458,7 +460,7 @@ impl<H: IntoHttpHandler> HttpServer<H>
if !self.sockets.is_empty() {
let addrs: Vec<(net::SocketAddr, net::TcpListener)> =
self.sockets.drain().collect();
self.sockets.drain(..).collect();
let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false);
let workers = self.start_workers(&settings, &StreamHandlerType::Normal);
let info = Info{addr: addrs[0].0, handler: StreamHandlerType::Normal};
@@ -742,6 +744,13 @@ fn start_accept_thread(
workers[next].0, info.clone()));
msg = err.into_inner();
workers.swap_remove(next);
if workers.is_empty() {
error!("No workers");
thread::sleep(sleep);
break
} else if workers.len() <= next {
next = 0;
}
continue
}
}
@@ -749,10 +758,12 @@ fn start_accept_thread(
break
}
},
Err(err) => {
if err.kind() != io::ErrorKind::WouldBlock {
error!("Error accepting connection: {:?}", err);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock =>
break,
Err(ref e) if connection_error(e) =>
continue,
Err(e) => {
error!("Error accepting connection: {}", e);
// sleep after error
thread::sleep(sleep);
break
@@ -826,3 +837,16 @@ fn create_tcp_listener(addr: net::SocketAddr, backlog: i32) -> io::Result<net::T
builder.bind(addr)?;
Ok(builder.listen(backlog)?)
}
/// This function defines errors that are per-connection. Which basically
/// means that if we get this error from `accept()` system call it means
/// next connection might be ready to be accepted.
///
/// All other errors will incur a timeout before next `accept()` is performed.
/// The timeout is useful to handle resource exhaustion errors like ENFILE
/// and EMFILE. Otherwise, could enter into tight loop.
fn connection_error(e: &io::Error) -> bool {
e.kind() == io::ErrorKind::ConnectionRefused ||
e.kind() == io::ErrorKind::ConnectionAborted ||
e.kind() == io::ErrorKind::ConnectionReset
}

View File

@@ -23,7 +23,7 @@ use actix::*;
use actix::msgs::StopArbiter;
use helpers;
use server::HttpHandler;
use server::{HttpHandler, KeepAlive};
use server::channel::HttpChannel;
use server::settings::WorkerSettings;
@@ -48,21 +48,30 @@ impl Message for StopWorker {
/// Http worker
///
/// Worker accepts Socket objects via unbounded channel and start requests processing.
pub(crate) struct Worker<H> where H: HttpHandler + 'static {
pub(crate)
struct Worker<H> where H: HttpHandler + 'static {
settings: Rc<WorkerSettings<H>>,
hnd: Handle,
handler: StreamHandlerType,
tcp_ka: Option<time::Duration>,
}
impl<H: HttpHandler + 'static> Worker<H> {
pub(crate) fn new(h: Vec<H>, handler: StreamHandlerType, keep_alive: Option<u64>)
pub(crate) fn new(h: Vec<H>, handler: StreamHandlerType, keep_alive: KeepAlive)
-> Worker<H>
{
let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive {
Some(time::Duration::new(val as u64, 0))
} else {
None
};
Worker {
settings: Rc::new(WorkerSettings::new(h, keep_alive)),
hnd: Arbiter::handle().clone(),
handler,
tcp_ka,
}
}
@@ -106,9 +115,7 @@ impl<H> Handler<Conn<net::TcpStream>> for Worker<H>
fn handle(&mut self, msg: Conn<net::TcpStream>, _: &mut Context<Self>)
{
if !self.settings.keep_alive_enabled() &&
msg.io.set_keepalive(Some(time::Duration::new(75, 0))).is_err()
{
if self.tcp_ka.is_some() && msg.io.set_keepalive(self.tcp_ka).is_err() {
error!("Can not set socket keep-alive option");
}
self.handler.handle(Rc::clone(&self.settings), &self.hnd, msg);
@@ -197,7 +204,8 @@ impl StreamHandlerType {
} else {
false
};
Arbiter::handle().spawn(HttpChannel::new(h, io, peer, http2));
Arbiter::handle().spawn(
HttpChannel::new(h, io, peer, http2));
},
Err(err) =>
trace!("Error during handling tls connection: {}", err),

View File

@@ -87,9 +87,12 @@ impl TestServer {
let sys = System::new("actix-test-server");
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = tcp.local_addr().unwrap();
let tcp = TcpListener::from_listener(tcp, &local_addr, Arbiter::handle()).unwrap();
let tcp = TcpListener::from_listener(
tcp, &local_addr, Arbiter::handle()).unwrap();
HttpServer::new(factory).disable_signals().start_incoming(tcp.incoming(), false);
HttpServer::new(factory)
.disable_signals()
.start_incoming(tcp.incoming(), false);
tx.send((Arbiter::system(), local_addr)).unwrap();
let _ = sys.run();

View File

@@ -2,17 +2,18 @@
use std::{fmt, io, str};
use std::rc::Rc;
use std::cell::UnsafeCell;
use std::time::Duration;
use base64;
use rand;
use bytes::Bytes;
use cookie::Cookie;
use byteorder::{ByteOrder, NetworkEndian};
use http::{HttpTryFrom, StatusCode, Error as HttpError};
use http::header::{self, HeaderName, HeaderValue};
use sha1::Sha1;
use futures::{Async, Future, Poll, Stream};
use futures::unsync::mpsc::{unbounded, UnboundedSender};
use byteorder::{ByteOrder, NetworkEndian};
use actix::prelude::*;
@@ -192,6 +193,14 @@ impl Client {
self
}
/// Set write buffer capacity
///
/// Default buffer capacity is 32kb
pub fn write_buffer_capacity(mut self, cap: usize) -> Self {
self.request.write_buffer_capacity(cap);
self
}
/// Set request header
pub fn header<K, V>(mut self, key: K, value: V) -> Self
where HeaderName: HttpTryFrom<K>, V: IntoHeaderValue
@@ -291,9 +300,32 @@ impl ClientHandshake {
request: None,
tx: None,
error: Some(err),
max_size: 0
max_size: 0,
}
}
/// Set handshake timeout
///
/// Handshake timeout is a total time before handshake should be completed.
/// Default value is 5 seconds.
pub fn timeout(mut self, timeout: Duration) -> Self {
if let Some(request) = self.request.take() {
self.request = Some(request.timeout(timeout));
}
self
}
/// Set connection timeout
///
/// Connection timeout includes resolving hostname and actual connection to
/// the host.
/// Default value is 1 second.
pub fn conn_timeout(mut self, timeout: Duration) -> Self {
if let Some(request) = self.request.take() {
self.request = Some(request.conn_timeout(timeout));
}
self
}
}
impl Future for ClientHandshake {

View File

@@ -1,4 +1,4 @@
use std::{fmt, mem};
use std::{fmt, mem, ptr};
use std::iter::FromIterator;
use bytes::{Bytes, BytesMut, BufMut};
use byteorder::{ByteOrder, BigEndian, NetworkEndian};
@@ -15,11 +15,8 @@ use ws::mask::apply_mask;
/// A struct representing a `WebSocket` frame.
#[derive(Debug)]
pub(crate) struct Frame {
pub struct Frame {
finished: bool,
rsv1: bool,
rsv2: bool,
rsv3: bool,
opcode: OpCode,
payload: Binary,
}
@@ -51,9 +48,11 @@ impl Frame {
Frame::message(payload, OpCode::Close, true, genmask)
}
/// Parse the input stream into a frame.
pub fn parse<S>(pl: &mut PayloadHelper<S>, server: bool, max_size: usize)
-> Poll<Option<Frame>, ProtocolError>
#[cfg_attr(feature="cargo-clippy", allow(type_complexity))]
fn read_copy_md<S>(pl: &mut PayloadHelper<S>,
server: bool,
max_size: usize
) -> Poll<Option<(usize, bool, OpCode, usize, Option<u32>)>, ProtocolError>
where S: Stream<Item=Bytes, Error=PayloadError>
{
let mut idx = 2;
@@ -74,12 +73,14 @@ impl Frame {
return Err(ProtocolError::MaskedFrame)
}
let rsv1 = first & 0x40 != 0;
let rsv2 = first & 0x20 != 0;
let rsv3 = first & 0x10 != 0;
// Op code
let opcode = OpCode::from(first & 0x0F);
let len = second & 0x7F;
if let OpCode::Bad = opcode {
return Err(ProtocolError::InvalidOpcode(first & 0x0F))
}
let len = second & 0x7F;
let length = if len == 126 {
let buf = match pl.copy(4)? {
Async::Ready(Some(buf)) => buf,
@@ -114,28 +115,129 @@ impl Frame {
Async::NotReady => return Ok(Async::NotReady),
};
let mut mask_bytes = [0u8; 4];
mask_bytes.copy_from_slice(&buf[idx..idx+4]);
let mask: &[u8] = &buf[idx..idx+4];
let mask_u32: u32 = unsafe {ptr::read_unaligned(mask.as_ptr() as *const u32)};
idx += 4;
Some(mask_bytes)
Some(mask_u32)
} else {
None
};
let mut data = match pl.readexactly(idx + length)? {
Async::Ready(Some(buf)) => buf,
Async::Ready(None) => return Ok(Async::Ready(None)),
Async::NotReady => return Ok(Async::NotReady),
};
Ok(Async::Ready(Some((idx, finished, opcode, length, mask))))
}
// get body
data.split_to(idx);
fn read_chunk_md(chunk: &[u8], server: bool, max_size: usize)
-> Poll<(usize, bool, OpCode, usize, Option<u32>), ProtocolError>
{
let chunk_len = chunk.len();
let mut idx = 2;
if chunk_len < 2 {
return Ok(Async::NotReady)
}
let first = chunk[0];
let second = chunk[1];
let finished = first & 0x80 != 0;
// check masking
let masked = second & 0x80 != 0;
if !masked && server {
return Err(ProtocolError::UnmaskedFrame)
} else if masked && !server {
return Err(ProtocolError::MaskedFrame)
}
// Op code
let opcode = OpCode::from(first & 0x0F);
// Disallow bad opcode
if let OpCode::Bad = opcode {
return Err(ProtocolError::InvalidOpcode(first & 0x0F))
}
let len = second & 0x7F;
let length = if len == 126 {
if chunk_len < 4 {
return Ok(Async::NotReady)
}
let len = NetworkEndian::read_uint(&chunk[idx..], 2) as usize;
idx += 2;
len
} else if len == 127 {
if chunk_len < 10 {
return Ok(Async::NotReady)
}
let len = NetworkEndian::read_uint(&chunk[idx..], 8) as usize;
idx += 8;
len
} else {
len as usize
};
// check for max allowed size
if length > max_size {
return Err(ProtocolError::Overflow)
}
let mask = if server {
if chunk_len < idx + 4 {
return Ok(Async::NotReady)
}
let mask: &[u8] = &chunk[idx..idx+4];
let mask_u32: u32 = unsafe {ptr::read_unaligned(mask.as_ptr() as *const u32)};
idx += 4;
Some(mask_u32)
} else {
None
};
Ok(Async::Ready((idx, finished, opcode, length, mask)))
}
/// Parse the input stream into a frame.
pub fn parse<S>(pl: &mut PayloadHelper<S>, server: bool, max_size: usize)
-> Poll<Option<Frame>, ProtocolError>
where S: Stream<Item=Bytes, Error=PayloadError>
{
// try to parse ws frame md from one chunk
let result = match pl.get_chunk()? {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(None) => return Ok(Async::Ready(None)),
Async::Ready(Some(chunk)) => Frame::read_chunk_md(chunk, server, max_size)?,
};
let (idx, finished, opcode, length, mask) = match result {
// we may need to join several chunks
Async::NotReady => match Frame::read_copy_md(pl, server, max_size)? {
Async::Ready(Some(item)) => item,
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(None) => return Ok(Async::Ready(None)),
},
Async::Ready(item) => item,
};
match pl.can_read(idx + length)? {
Async::Ready(Some(true)) => (),
Async::Ready(None) => return Ok(Async::Ready(None)),
Async::Ready(Some(false)) | Async::NotReady => return Ok(Async::NotReady),
}
// remove prefix
pl.drop_payload(idx);
// no need for body
if length == 0 {
return Ok(Async::Ready(Some(Frame {
finished, opcode, payload: Binary::from("") })));
}
let data = match pl.read_exact(length)? {
Async::Ready(Some(buf)) => buf,
Async::Ready(None) => return Ok(Async::Ready(None)),
Async::NotReady => panic!(),
};
// control frames must have length <= 125
match opcode {
OpCode::Ping | OpCode::Pong if length > 125 => {
@@ -149,12 +251,14 @@ impl Frame {
}
// unmask
if let Some(ref mask) = mask {
apply_mask(&mut data, mask);
if let Some(mask) = mask {
#[allow(mutable_transmutes)]
let p: &mut [u8] = unsafe{let ptr: &[u8] = &data; mem::transmute(ptr)};
apply_mask(p, mask);
}
Ok(Async::Ready(Some(Frame {
finished, rsv1, rsv2, rsv3, opcode, payload: data.into() })))
finished, opcode, payload: data.into() })))
}
/// Generate binary representation
@@ -199,13 +303,13 @@ impl Frame {
};
if genmask {
let mask: [u8; 4] = rand::random();
let mask = rand::random::<u32>();
unsafe {
{
let buf_mut = buf.bytes_mut();
buf_mut[..4].copy_from_slice(&mask);
*(buf_mut as *mut _ as *mut u32) = mask;
buf_mut[4..payload_len+4].copy_from_slice(payload.as_ref());
apply_mask(&mut buf_mut[4..], &mask);
apply_mask(&mut buf_mut[4..], mask);
}
buf.advance_mut(payload_len + 4);
}
@@ -221,9 +325,6 @@ impl Default for Frame {
fn default() -> Frame {
Frame {
finished: true,
rsv1: false,
rsv2: false,
rsv3: false,
opcode: OpCode::Close,
payload: Binary::from(&b""[..]),
}
@@ -236,15 +337,11 @@ impl fmt::Display for Frame {
"
<FRAME>
final: {}
reserved: {} {} {}
opcode: {}
payload length: {}
payload: 0x{}
</FRAME>",
self.finished,
self.rsv1,
self.rsv2,
self.rsv3,
self.opcode,
self.payload.len(),
self.payload.as_ref().iter().map(
@@ -282,7 +379,6 @@ mod tests {
let mut buf = PayloadHelper::new(once(Ok(buf.freeze())));
let frame = extract(Frame::parse(&mut buf, false, 1024));
println!("FRAME: {}", frame);
assert!(!frame.finished);
assert_eq!(frame.opcode, OpCode::Text);
assert_eq!(frame.payload.as_ref(), &b"1"[..]);

View File

@@ -5,7 +5,7 @@ use std::ptr::copy_nonoverlapping;
/// Mask/unmask a frame.
#[inline]
pub fn apply_mask(buf: &mut [u8], mask: &[u8; 4]) {
pub fn apply_mask(buf: &mut [u8], mask: u32) {
apply_mask_fast32(buf, mask)
}
@@ -18,34 +18,41 @@ fn apply_mask_fallback(buf: &mut [u8], mask: &[u8; 4]) {
}
}
/// Faster version of `apply_mask()` which operates on 4-byte blocks.
/// Faster version of `apply_mask()` which operates on 8-byte blocks.
#[inline]
#[allow(dead_code)]
fn apply_mask_fast32(buf: &mut [u8], mask: &[u8; 4]) {
// TODO replace this with read_unaligned() as it stabilizes.
let mask_u32 = unsafe {
let mut m: u32 = uninitialized();
#[allow(trivial_casts)]
copy_nonoverlapping(mask.as_ptr(), &mut m as *mut _ as *mut u8, 4);
m
};
#[cfg_attr(feature="cargo-clippy", allow(cast_lossless))]
fn apply_mask_fast32(buf: &mut [u8], mask_u32: u32) {
let mut ptr = buf.as_mut_ptr();
let mut len = buf.len();
// Possible first unaligned block.
let head = min(len, (4 - (ptr as usize & 3)) & 3);
let head = min(len, (8 - (ptr as usize & 0x7)) & 0x3);
let mask_u32 = if head > 0 {
unsafe {
xor_mem(ptr, mask_u32, head);
ptr = ptr.offset(head as isize);
}
len -= head;
if cfg!(target_endian = "big") {
mask_u32.rotate_left(8 * head as u32)
let n = if head > 4 { head - 4 } else { head };
let mask_u32 = if n > 0 {
unsafe {
xor_mem(ptr, mask_u32, n);
ptr = ptr.offset(head as isize);
}
len -= n;
if cfg!(target_endian = "big") {
mask_u32.rotate_left(8 * n as u32)
} else {
mask_u32.rotate_right(8 * n as u32)
}
} else {
mask_u32.rotate_right(8 * head as u32)
mask_u32
};
if head > 4 {
unsafe {
*(ptr as *mut u32) ^= mask_u32;
ptr = ptr.offset(4);
len -= 4;
}
}
mask_u32
} else {
mask_u32
};
@@ -55,7 +62,20 @@ fn apply_mask_fast32(buf: &mut [u8], mask: &[u8; 4]) {
}
// Properly aligned middle of the data.
while len > 4 {
if len >= 8 {
let mut mask_u64 = mask_u32 as u64;
mask_u64 = mask_u64 << 32 | mask_u32 as u64;
while len >= 8 {
unsafe {
*(ptr as *mut u64) ^= mask_u64;
ptr = ptr.offset(8);
len -= 8;
}
}
}
while len >= 4 {
unsafe {
*(ptr as *mut u32) ^= mask_u32;
ptr = ptr.offset(4);
@@ -83,6 +103,7 @@ unsafe fn xor_mem(ptr: *mut u8, mask: u32, len: usize) {
#[cfg(test)]
mod tests {
use std::ptr;
use super::{apply_mask_fallback, apply_mask_fast32};
#[test]
@@ -90,6 +111,8 @@ mod tests {
let mask = [
0x6d, 0xb6, 0xb2, 0x80,
];
let mask_u32: u32 = unsafe {ptr::read_unaligned(mask.as_ptr() as *const u32)};
let unmasked = vec![
0xf3, 0x00, 0x01, 0x02, 0x03, 0x80, 0x81, 0x82,
0xff, 0xfe, 0x00, 0x17, 0x74, 0xf9, 0x12, 0x03,
@@ -101,7 +124,7 @@ mod tests {
apply_mask_fallback(&mut masked, &mask);
let mut masked_fast = unmasked.clone();
apply_mask_fast32(&mut masked_fast, &mask);
apply_mask_fast32(&mut masked_fast, mask_u32);
assert_eq!(masked, masked_fast);
}
@@ -112,7 +135,7 @@ mod tests {
apply_mask_fallback(&mut masked[1..], &mask);
let mut masked_fast = unmasked.clone();
apply_mask_fast32(&mut masked_fast[1..], &mask);
apply_mask_fast32(&mut masked_fast[1..], mask_u32);
assert_eq!(masked, masked_fast);
}

View File

@@ -63,8 +63,8 @@ mod context;
mod mask;
mod client;
use self::frame::Frame;
use self::proto::{hash_key, OpCode};
pub use self::frame::Frame;
pub use self::proto::OpCode;
pub use self::proto::CloseCode;
pub use self::context::WebsocketContext;
pub use self::client::{Client, ClientError,
@@ -248,7 +248,7 @@ pub fn handshake<S>(req: &HttpRequest<S>) -> Result<HttpResponseBuilder, Handsha
}
let key = {
let key = req.headers().get(header::SEC_WEBSOCKET_KEY).unwrap();
hash_key(key.as_ref())
proto::hash_key(key.as_ref())
};
Ok(HttpResponse::build(StatusCode::SWITCHING_PROTOCOLS)
@@ -304,7 +304,7 @@ impl<S> Stream for WsStream<S> where S: Stream<Item=Bytes, Error=PayloadError> {
}
match opcode {
OpCode::Continue => unimplemented!(),
OpCode::Continue => Err(ProtocolError::NoContinuation),
OpCode::Bad => {
self.closed = true;
Err(ProtocolError::BadOpCode)

View File

@@ -6,7 +6,7 @@ use base64;
use self::OpCode::*;
/// Operation codes as part of rfc6455.
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub(crate) enum OpCode {
pub enum OpCode {
/// Indicates a continuation frame of a fragmented message.
Continue,
/// Indicates a text data frame.

View File

@@ -66,6 +66,21 @@ fn test_simple() {
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
}
#[test]
fn test_with_query_parameter() {
let mut srv = test::TestServer::new(
|app| app.handler(|req: HttpRequest| match req.query().get("qp") {
Some(_) => httpcodes::HTTPOk.build().finish(),
None => httpcodes::HTTPBadRequest.build().finish(),
}));
let request = srv.get().uri(srv.url("/?qp=5").as_str()).finish().unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
}
#[test]
fn test_no_decompress() {
let mut srv = test::TestServer::new(

View File

@@ -743,7 +743,7 @@ fn test_h2() {
})
});
let _res = core.run(tcp);
// assert_eq!(_res.unwrap(), Bytes::from_static(STR.as_ref()));
// assert_eq!(res.unwrap(), Bytes::from_static(STR.as_ref()));
}
#[test]

View File

@@ -35,8 +35,9 @@ fn main() {
-s, --size=[NUMBER] 'size of PUBLISH packet payload to send in KB'
-w, --warm-up=[SECONDS] 'seconds before counter values are considered for reporting'
-r, --sample-rate=[SECONDS] 'seconds between average reports'
-c, --concurrency=[NUMBER] 'number of websockt connections to open and use concurrently for sending'
-t, --threads=[NUMBER] 'number of threads to use'",
-c, --concurrency=[NUMBER] 'number of websocket connections to open and use concurrently for sending'
-t, --threads=[NUMBER] 'number of threads to use'
--max-payload=[NUMBER] 'max size of payload before reconnect KB'",
)
.get_matches();
@@ -50,9 +51,13 @@ fn main() {
let threads = parse_u64_default(matches.value_of("threads"), num_cpus::get() as u64);
let concurrency = parse_u64_default(matches.value_of("concurrency"), 1);
let payload_size: usize = match matches.value_of("size") {
Some(s) => parse_u64_default(Some(s), 0) as usize * 1024,
Some(s) => parse_u64_default(Some(s), 1) as usize * 1024,
None => 1024,
};
let max_payload_size: usize = match matches.value_of("max-payload") {
Some(s) => parse_u64_default(Some(s), 0) as usize * 1024,
None => 0,
};
let warmup_seconds = parse_u64_default(matches.value_of("warm-up"), 2) as u64;
let sample_rate = parse_u64_default(matches.value_of("sample-rate"), 1) as usize;
@@ -64,7 +69,10 @@ fn main() {
let sys = actix::System::new("ws-client");
let mut report = true;
let _: () = Perf{counters: perf_counters.clone(),
payload: payload.len(),
sample_rate_secs: sample_rate}.start();
for t in 0..threads {
let pl = payload.clone();
let ws = ws_url.clone();
@@ -72,40 +80,41 @@ fn main() {
let addr = Arbiter::new(format!("test {}", t));
addr.do_send(actix::msgs::Execute::new(move || -> Result<(), ()> {
let mut reps = report;
for _ in 0..concurrency {
let pl2 = pl.clone();
let perf2 = perf.clone();
let ws2 = ws.clone();
Arbiter::handle().spawn(
ws::Client::new(&ws).connect()
ws::Client::new(&ws)
.write_buffer_capacity(0)
.connect()
.map_err(|e| {
println!("Error: {}", e);
Arbiter::system().do_send(actix::msgs::SystemExit(0));
//Arbiter::system().do_send(actix::msgs::SystemExit(0));
()
})
.map(move |(reader, writer)| {
let addr: Addr<Syn, _> = ChatClient::create(move |ctx| {
ChatClient::add_stream(reader, ctx);
ChatClient{conn: writer,
ChatClient{url: ws2,
conn: writer,
payload: pl2,
report: reps,
bin: bin,
ts: time::precise_time_ns(),
perf_counters: perf2,
sample_rate_secs: sample_rate,
sent: 0,
max_payload_size: max_payload_size,
}
});
})
);
reps = false;
}
Ok(())
}));
report = false;
}
let _ = sys.run();
let res = sys.run();
}
fn parse_u64_default(input: Option<&str>, default: u64) -> u64 {
@@ -113,43 +122,33 @@ fn parse_u64_default(input: Option<&str>, default: u64) -> u64 {
.unwrap_or(default)
}
struct ChatClient{
conn: ws::ClientWriter,
payload: Arc<String>,
ts: u64,
bin: bool,
report: bool,
perf_counters: Arc<PerfCounters>,
struct Perf {
counters: Arc<PerfCounters>,
payload: usize,
sample_rate_secs: usize,
}
impl Actor for ChatClient {
impl Actor for Perf {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
self.send_text();
if self.report {
self.sample_rate(ctx);
}
}
fn stopping(&mut self, _: &mut Context<Self>) -> Running {
Arbiter::system().do_send(actix::msgs::SystemExit(0));
Running::Stop
self.sample_rate(ctx);
}
}
impl ChatClient {
impl Perf {
fn sample_rate(&self, ctx: &mut Context<Self>) {
ctx.run_later(Duration::new(self.sample_rate_secs as u64, 0), |act, ctx| {
let req_count = act.perf_counters.pull_request_count();
let req_count = act.counters.pull_request_count();
if req_count != 0 {
let latency = act.perf_counters.pull_latency_ns();
let latency_max = act.perf_counters.pull_latency_max_ns();
let conns = act.counters.pull_connections_count();
let latency = act.counters.pull_latency_ns();
let latency_max = act.counters.pull_latency_max_ns();
println!(
"rate: {}, throughput: {:?} kb, latency: {}, latency max: {}",
"rate: {}, conns: {}, throughput: {:?} kb, latency: {}, latency max: {}",
req_count / act.sample_rate_secs,
(((req_count * act.payload.len()) as f64) / 1024.0) /
conns / act.sample_rate_secs,
(((req_count * act.payload) as f64) / 1024.0) /
act.sample_rate_secs as f64,
time::Duration::nanoseconds((latency / req_count as u64) as i64),
time::Duration::nanoseconds(latency_max as i64)
@@ -159,13 +158,71 @@ impl ChatClient {
act.sample_rate(ctx);
});
}
}
fn send_text(&mut self) {
self.ts = time::precise_time_ns();
if self.bin {
self.conn.binary(&self.payload);
struct ChatClient{
url: String,
conn: ws::ClientWriter,
payload: Arc<String>,
ts: u64,
bin: bool,
perf_counters: Arc<PerfCounters>,
sent: usize,
max_payload_size: usize,
}
impl Actor for ChatClient {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
self.send_text();
self.perf_counters.register_connection();
}
}
impl ChatClient {
fn send_text(&mut self) -> bool {
self.sent += self.payload.len();
if self.max_payload_size > 0 && self.sent > self.max_payload_size {
let ws = self.url.clone();
let pl = self.payload.clone();
let bin = self.bin;
let perf_counters = self.perf_counters.clone();
let max_payload_size = self.max_payload_size;
Arbiter::handle().spawn(
ws::Client::new(&self.url).connect()
.map_err(|e| {
println!("Error: {}", e);
Arbiter::system().do_send(actix::msgs::SystemExit(0));
()
})
.map(move |(reader, writer)| {
let addr: Addr<Syn, _> = ChatClient::create(move |ctx| {
ChatClient::add_stream(reader, ctx);
ChatClient{url: ws,
conn: writer,
payload: pl,
bin: bin,
ts: time::precise_time_ns(),
perf_counters: perf_counters,
sent: 0,
max_payload_size: max_payload_size,
}
});
})
);
false
} else {
self.conn.text(&self.payload);
self.ts = time::precise_time_ns();
if self.bin {
self.conn.binary(&self.payload);
} else {
self.conn.text(&self.payload);
}
true
}
}
}
@@ -183,7 +240,9 @@ impl StreamHandler<ws::Message, ws::ProtocolError> for ChatClient {
if txt == self.payload.as_ref().as_str() {
self.perf_counters.register_request();
self.perf_counters.register_latency(time::precise_time_ns() - self.ts);
self.send_text();
if !self.send_text() {
ctx.stop();
}
} else {
println!("not eaqual");
}
@@ -196,6 +255,7 @@ impl StreamHandler<ws::Message, ws::ProtocolError> for ChatClient {
pub struct PerfCounters {
req: AtomicUsize,
conn: AtomicUsize,
lat: AtomicUsize,
lat_max: AtomicUsize
}
@@ -204,6 +264,7 @@ impl PerfCounters {
pub fn new() -> PerfCounters {
PerfCounters {
req: AtomicUsize::new(0),
conn: AtomicUsize::new(0),
lat: AtomicUsize::new(0),
lat_max: AtomicUsize::new(0),
}
@@ -213,6 +274,10 @@ impl PerfCounters {
self.req.swap(0, Ordering::SeqCst)
}
pub fn pull_connections_count(&self) -> usize {
self.conn.swap(0, Ordering::SeqCst)
}
pub fn pull_latency_ns(&self) -> u64 {
self.lat.swap(0, Ordering::SeqCst) as u64
}
@@ -225,6 +290,10 @@ impl PerfCounters {
self.req.fetch_add(1, Ordering::SeqCst);
}
pub fn register_connection(&self) {
self.conn.fetch_add(1, Ordering::SeqCst);
}
pub fn register_latency(&self, nanos: u64) {
let nanos = nanos as usize;
self.lat.fetch_add(nanos, Ordering::SeqCst);