1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-07-22 08:57:17 +02:00

Compare commits

...

50 Commits

Author SHA1 Message Date
Nikolay Kim
ce6d237cc1 prepare 0.4.10 release 2018-03-20 15:53:39 -07:00
Nikolay Kim
70caa2552b simplify httpresponse release 2018-03-20 15:51:19 -07:00
Nikolay Kim
ee7d58dd7f disable h2 2018-03-20 12:35:44 -07:00
Nikolay Kim
c4f4cadb43 Fix http/2 date header generation 2018-03-20 11:40:05 -07:00
Nikolay Kim
978091cedb wake up io task when next chunk of data is needed 2018-03-20 11:37:13 -07:00
Nikolay Kim
8198f5e10a Refactor TestServer configuration 2018-03-20 11:23:35 -07:00
Nikolay Kim
6cd40df387 Fix server websockets big payloads support 2018-03-19 17:27:03 -07:00
Nikolay Kim
35ee5d36d8 actix 0.5.5, ws test 2018-03-19 13:12:36 -07:00
Nikolay Kim
e7ec0f9fd7 ws tests and proper write payload ref 2018-03-19 09:30:58 -07:00
Nikolay Kim
f4a47ef71e allow set client request/ws timeout 2018-03-18 19:27:51 -07:00
Nikolay Kim
6b1a79fab8 update example 2018-03-18 16:27:34 -07:00
Nikolay Kim
ab73da4a1a use Error instead of InternalError for helper methods error::ErrorXXX 2018-03-18 14:18:47 -07:00
Nikolay Kim
e0c8da567c various optimizations 2018-03-18 11:05:44 -07:00
Douman
c10dedf7e4 Merge pull request #124 from DoumanAsh/show_hidden
Show Request's hidden methods
2018-03-17 18:39:21 +03:00
Douman
ec192e0ab1 Show Request's hidden methods 2018-03-17 18:10:22 +03:00
Nikolay Kim
6d792d9948 simplify h1 parse 2018-03-16 20:56:23 -07:00
Nikolay Kim
1fe4315c94 use actix 0.5.4 2018-03-16 13:37:47 -07:00
Nikolay Kim
381b90e9a1 bump version 2018-03-16 12:31:29 -07:00
Nikolay Kim
2d18dba40a fix compilation 2018-03-16 12:28:08 -07:00
Nikolay Kim
d2693d58a8 clippy warnings 2018-03-16 12:12:55 -07:00
Nikolay Kim
84bf282c17 add basic client connection pooling 2018-03-16 12:04:01 -07:00
Nikolay Kim
b15b5e5246 check number of available workers 2018-03-16 11:17:27 -07:00
Douman
52b3b0c362 Merge pull request #119 from DoumanAsh/default_static_files
Add default resource for StaticFiles
2018-03-16 20:12:07 +03:00
Nikolay Kim
64c4cefa8f Merge branch 'master' into default_static_files 2018-03-16 09:31:36 -07:00
Nikolay Kim
7e8b231f57 disable test 2018-03-16 09:13:36 -07:00
Douman
8a344d0c94 Add default resource for StaticFiles 2018-03-16 19:04:36 +03:00
Nikolay Kim
4096089a3f allow to disable http/2 support 2018-03-16 08:48:44 -07:00
Nikolay Kim
b16f2d5f05 proper check for actor context poll 2018-03-16 08:04:26 -07:00
Nikolay Kim
5baf15822a always start actors 2018-03-16 07:46:27 -07:00
Nikolay Kim
5368ce823e Merge pull request #123 from h416/patch-1
fix typo
2018-03-16 05:31:10 -07:00
h416
4effdf065b fix typo 2018-03-16 19:03:16 +09:00
Nikolay Kim
61970ab190 always poll stream or actor for the first time 2018-03-15 17:11:49 -07:00
Nikolay Kim
484b00a0f9 Merge branch 'master' of github.com:actix/actix-web 2018-03-15 16:55:33 -07:00
Nikolay Kim
73bf2068aa allow to use NamedFile with any request method 2018-03-15 16:55:22 -07:00
Nikolay Kim
1cda949204 Merge pull request #122 from mockersf/test_qp
test for query parameters in client
2018-03-14 16:10:31 -07:00
François Mockers
ad6b823255 test for query parameters in client 2018-03-14 21:45:49 +01:00
Nikolay Kim
0f064db31d Move brotli encoding to a feature 2018-03-13 17:21:22 -07:00
Nikolay Kim
fd0bb54469 add debug formatter for ClientRequestBuilder 2018-03-13 15:09:05 -07:00
Nikolay Kim
e27bbaa55c Update CHANGES.md 2018-03-13 13:15:21 -07:00
Nikolay Kim
8a50eae1e2 Merge pull request #121 from glademiller/master
Send Query Parameters in client requests
2018-03-13 13:14:51 -07:00
Glade Miller
38080f67b3 If no path is available from the URI request / 2018-03-13 13:35:11 -06:00
Glade Miller
08504e0892 Move path call inline into write 2018-03-13 13:26:13 -06:00
Glade Miller
401c0ad809 https://github.com/actix/actix-web/issues/120 - Send Query Parameters in client requests 2018-03-13 13:17:55 -06:00
Nikolay Kim
b4b0deb7fa Wake payload reading task when data is available 2018-03-12 16:29:13 -07:00
Nikolay Kim
05ff35d383 Fix server keep-alive handling 2018-03-12 16:16:17 -07:00
Nikolay Kim
29c3e8f7ea update test 2018-03-12 10:19:09 -07:00
Nikolay Kim
6657446433 Allow to set read buffer capacity for server request 2018-03-12 10:01:56 -07:00
Nikolay Kim
46b9a9c887 update readme 2018-03-12 09:13:04 -07:00
Nikolay Kim
b3cdb472d0 remove reserved state for h2 write if buffer is empty 2018-03-12 09:04:54 -07:00
Nikolay Kim
31e1aab9a4 do not log WouldBlock error from socket accept 2018-03-12 09:02:15 -07:00
39 changed files with 1366 additions and 518 deletions

View File

@@ -12,9 +12,6 @@ matrix:
- rust: stable
- rust: beta
- rust: nightly
allow_failures:
- rust: nightly
- rust: beta
#rust:
# - 1.21.0

View File

@@ -1,5 +1,44 @@
# Changes
## 0.4.10 (2018-03-20)
* Use `Error` instead of `InternalError` for `error::ErrorXXXX` methods
* Allow to set client request timeout
* Allow to set client websocket handshake timeout
* Refactor `TestServer` configuration
* Fix server websockets big payloads support
* Fix http/2 date header generation
## 0.4.9 (2018-03-16)
* Allow to disable http/2 support
* Wake payload reading task when data is available
* Fix server keep-alive handling
* Send Query Parameters in client requests #120
* Move brotli encoding to a feature
* Add option of default handler for `StaticFiles` handler #57
* Add basic client connection pooling
## 0.4.8 (2018-03-12)
* Allow to set read buffer capacity for server request
* Handle WouldBlock error for socket accept call
## 0.4.7 (2018-03-11)
* Fix panic on unknown content encoding
@@ -35,7 +74,7 @@
* Better support for `NamedFile` type
* Add `ResponseError` impl for `SendRequestError`. This improves ergonomics of the client.
* Add native-tls support for client
* Allow client connection timeout to be set #108

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-web"
version = "0.4.7"
version = "0.4.10"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix web is a simple, pragmatic, extremely fast, web framework for Rust."
readme = "README.md"
@@ -27,7 +27,7 @@ name = "actix_web"
path = "src/lib.rs"
[features]
default = ["session"]
default = ["session", "brotli"]
# tls
tls = ["native-tls", "tokio-tls"]
@@ -38,12 +38,14 @@ alpn = ["openssl", "openssl/v102", "openssl/v110", "tokio-openssl"]
# sessions
session = ["cookie/secure"]
# brotli encoding
brotli = ["brotli2"]
[dependencies]
actix = "^0.5.2"
actix = "^0.5.5"
base64 = "0.9"
bitflags = "1.0"
brotli2 = "^0.3.2"
failure = "0.1.1"
flate2 = "1.0"
h2 = "0.1"
@@ -67,6 +69,7 @@ encoding = "0.2"
language-tags = "0.2"
url = { version="1.7", features=["query_encoding"] }
cookie = { version="0.10", features=["percent-encode"] }
brotli2 = { version="^0.3.2", optional = true }
# io
mio = "^0.6.13"

View File

@@ -31,7 +31,7 @@ Actix web is a simple, pragmatic, extremely fast, web framework for Rust.
## Example
```rust,ignore
```rust
extern crate actix_web;
use actix_web::*;

View File

@@ -12,7 +12,7 @@ fn main() {
let _ = fs::remove_file(f);
// generates doc tests for `README.md`.
skeptic::generate_doc_tests(
&["README.md",
&[// "README.md",
"guide/src/qs_1.md",
"guide/src/qs_2.md",
"guide/src/qs_3.md",

View File

@@ -124,7 +124,8 @@ fn main() {
}
}))
.resource("/error.html", |r| r.f(|req| {
error::ErrorBadRequest(io::Error::new(io::ErrorKind::Other, "test"))
error::InternalError::new(
io::Error::new(io::ErrorKind::Other, "test"), StatusCode::OK)
}))
// static files
.handler("/static/", fs::StaticFiles::new("../static/", true))

View File

@@ -1,4 +1,4 @@
# websockect
# websocket
Simple echo websocket server.

View File

@@ -235,6 +235,7 @@ impl<'a> From<&'a Arc<Vec<u8>>> for Binary {
}
impl AsRef<[u8]> for Binary {
#[inline]
fn as_ref(&self) -> &[u8] {
match *self {
Binary::Bytes(ref bytes) => bytes.as_ref(),

View File

@@ -1,15 +1,18 @@
use std::{io, time};
use std::{fmt, io, time};
use std::cell::RefCell;
use std::rc::Rc;
use std::net::Shutdown;
use std::time::Duration;
use std::time::{Duration, Instant};
use std::collections::{HashMap, VecDeque};
use actix::{fut, Actor, ActorFuture, Context,
use actix::{fut, Actor, ActorFuture, Context, AsyncContext,
Handler, Message, ActorResponse, Supervised};
use actix::registry::ArbiterService;
use actix::fut::WrapFuture;
use actix::actors::{Connector, ConnectorError, Connect as ResolveConnect};
use http::{Uri, HttpTryFrom, Error as HttpError};
use futures::Poll;
use futures::{Async, Poll};
use tokio_io::{AsyncRead, AsyncWrite};
#[cfg(feature="alpn")]
@@ -104,10 +107,15 @@ pub struct ClientConnector {
connector: SslConnector,
#[cfg(all(feature="tls", not(feature="alpn")))]
connector: TlsConnector,
pool: Rc<Pool>,
}
impl Actor for ClientConnector {
type Context = Context<ClientConnector>;
fn started(&mut self, ctx: &mut Self::Context) {
self.collect(ctx);
}
}
impl Supervised for ClientConnector {}
@@ -120,19 +128,21 @@ impl Default for ClientConnector {
{
let builder = SslConnector::builder(SslMethod::tls()).unwrap();
ClientConnector {
connector: builder.build()
connector: builder.build(),
pool: Rc::new(Pool::new()),
}
}
#[cfg(all(feature="tls", not(feature="alpn")))]
{
let builder = TlsConnector::builder().unwrap();
ClientConnector {
connector: builder.build().unwrap()
connector: builder.build().unwrap(),
pool: Rc::new(Pool::new()),
}
}
#[cfg(not(any(feature="alpn", feature="tls")))]
ClientConnector {}
ClientConnector {pool: Rc::new(Pool::new())}
}
}
@@ -182,7 +192,12 @@ impl ClientConnector {
/// }
/// ```
pub fn with_connector(connector: SslConnector) -> ClientConnector {
ClientConnector { connector }
ClientConnector { connector, pool: Rc::new(Pool::new()) }
}
fn collect(&mut self, ctx: &mut Context<Self>) {
self.pool.collect();
ctx.run_later(Duration::from_secs(1), |act, ctx| act.collect(ctx));
}
}
@@ -214,10 +229,21 @@ impl Handler<Connect> for ClientConnector {
let host = uri.host().unwrap().to_owned();
let port = uri.port().unwrap_or_else(|| proto.port());
let key = Key {host, port, ssl: proto.is_secure()};
let pool = if proto.is_http() {
if let Some(conn) = self.pool.query(&key) {
return ActorResponse::async(fut::ok(conn))
} else {
Some(Rc::clone(&self.pool))
}
} else {
None
};
ActorResponse::async(
Connector::from_registry()
.send(ResolveConnect::host_and_port(&host, port)
.send(ResolveConnect::host_and_port(&key.host, port)
.timeout(conn_timeout))
.into_actor(self)
.map_err(|_, _, _| ClientConnectorError::Disconnected)
@@ -228,12 +254,14 @@ impl Handler<Connect> for ClientConnector {
Ok(stream) => {
if proto.is_secure() {
fut::Either::A(
_act.connector.connect_async(&host, stream)
_act.connector.connect_async(&key.host, stream)
.map_err(ClientConnectorError::SslError)
.map(|stream| Connection{stream: Box::new(stream)})
.map(|stream| Connection::new(
key, pool, Box::new(stream)))
.into_actor(_act))
} else {
fut::Either::B(fut::ok(Connection{stream: Box::new(stream)}))
fut::Either::B(fut::ok(
Connection::new(key, pool, Box::new(stream))))
}
}
}
@@ -244,12 +272,14 @@ impl Handler<Connect> for ClientConnector {
Ok(stream) => {
if proto.is_secure() {
fut::Either::A(
_act.connector.connect_async(&host, stream)
_act.connector.connect_async(&key.host, stream)
.map_err(ClientConnectorError::SslError)
.map(|stream| Connection{stream: Box::new(stream)})
.map(|stream| Connection::new(
key, pool, Box::new(stream)))
.into_actor(_act))
} else {
fut::Either::B(fut::ok(Connection{stream: Box::new(stream)}))
fut::Either::B(fut::ok(
Connection::new(key, pool, Box::new(stream))))
}
}
}
@@ -261,7 +291,7 @@ impl Handler<Connect> for ClientConnector {
if proto.is_secure() {
fut::err(ClientConnectorError::SslIsNotSupported)
} else {
fut::ok(Connection{stream: Box::new(stream)})
fut::ok(Connection::new(key, pool, Box::new(stream)))
}
}
}
@@ -288,6 +318,13 @@ impl Protocol {
}
}
fn is_http(&self) -> bool {
match *self {
Protocol::Https | Protocol::Http => true,
_ => false,
}
}
fn is_secure(&self) -> bool {
match *self {
Protocol::Https | Protocol::Wss => true,
@@ -303,18 +340,156 @@ impl Protocol {
}
}
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
struct Key {
host: String,
port: u16,
ssl: bool,
}
impl Key {
fn empty() -> Key {
Key{host: String::new(), port: 0, ssl: false}
}
}
#[derive(Debug)]
struct Conn(Instant, Connection);
pub struct Pool {
max_size: usize,
keep_alive: Duration,
max_lifetime: Duration,
pool: RefCell<HashMap<Key, VecDeque<Conn>>>,
to_close: RefCell<Vec<Connection>>,
}
impl Pool {
fn new() -> Pool {
Pool {
max_size: 128,
keep_alive: Duration::from_secs(15),
max_lifetime: Duration::from_secs(75),
pool: RefCell::new(HashMap::new()),
to_close: RefCell::new(Vec::new()),
}
}
fn collect(&self) {
let mut pool = self.pool.borrow_mut();
let mut to_close = self.to_close.borrow_mut();
// check keep-alive
let now = Instant::now();
for conns in pool.values_mut() {
while !conns.is_empty() {
if (now - conns[0].0) > self.keep_alive
|| (now - conns[0].1.ts) > self.max_lifetime
{
let conn = conns.pop_front().unwrap().1;
to_close.push(conn);
} else {
break
}
}
}
// check connections for shutdown
let mut idx = 0;
while idx < to_close.len() {
match AsyncWrite::shutdown(&mut to_close[idx]) {
Ok(Async::NotReady) => idx += 1,
_ => {
to_close.swap_remove(idx);
},
}
}
}
fn query(&self, key: &Key) -> Option<Connection> {
let mut pool = self.pool.borrow_mut();
let mut to_close = self.to_close.borrow_mut();
if let Some(ref mut connections) = pool.get_mut(key) {
let now = Instant::now();
while let Some(conn) = connections.pop_back() {
// check if it still usable
if (now - conn.0) > self.keep_alive
|| (now - conn.1.ts) > self.max_lifetime
{
to_close.push(conn.1);
} else {
let mut conn = conn.1;
let mut buf = [0; 2];
match conn.stream().read(&mut buf) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => (),
Ok(n) if n > 0 => {
to_close.push(conn);
continue
},
Ok(_) | Err(_) => continue,
}
return Some(conn)
}
}
}
None
}
fn release(&self, conn: Connection) {
if (Instant::now() - conn.ts) < self.max_lifetime {
let mut pool = self.pool.borrow_mut();
if !pool.contains_key(&conn.key) {
let key = conn.key.clone();
let mut vec = VecDeque::new();
vec.push_back(Conn(Instant::now(), conn));
pool.insert(key, vec);
} else {
let vec = pool.get_mut(&conn.key).unwrap();
vec.push_back(Conn(Instant::now(), conn));
if vec.len() > self.max_size {
let conn = vec.pop_front().unwrap();
self.to_close.borrow_mut().push(conn.1);
}
}
}
}
}
pub struct Connection {
key: Key,
stream: Box<IoStream>,
pool: Option<Rc<Pool>>,
ts: Instant,
}
impl fmt::Debug for Connection {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Connection {}:{}", self.key.host, self.key.port)
}
}
impl Connection {
fn new(key: Key, pool: Option<Rc<Pool>>, stream: Box<IoStream>) -> Self {
Connection {
key, pool, stream,
ts: Instant::now(),
}
}
pub fn stream(&mut self) -> &mut IoStream {
&mut *self.stream
}
pub fn from_stream<T: IoStream>(io: T) -> Connection {
Connection{stream: Box::new(io)}
Connection::new(Key::empty(), None, Box::new(io))
}
pub fn release(mut self) {
if let Some(pool) = self.pool.take() {
pool.release(self)
}
}
}

View File

@@ -171,7 +171,8 @@ impl Future for SendRequest {
};
let pl = Box::new(Pipeline {
body, conn, writer,
body, writer,
conn: Some(conn),
parser: Some(HttpResponseParser::default()),
parser_buf: BytesMut::new(),
disconnected: false,
@@ -208,7 +209,7 @@ impl Future for SendRequest {
pub(crate) struct Pipeline {
body: IoBody,
conn: Connection,
conn: Option<Connection>,
writer: HttpClientWriter,
parser: Option<HttpResponseParser>,
parser_buf: BytesMut,
@@ -249,30 +250,45 @@ impl RunningState {
impl Pipeline {
fn release_conn(&mut self) {
if let Some(conn) = self.conn.take() {
conn.release()
}
}
#[inline]
pub fn parse(&mut self) -> Poll<ClientResponse, HttpResponseParserError> {
match self.parser.as_mut().unwrap().parse(&mut self.conn, &mut self.parser_buf) {
Ok(Async::Ready(resp)) => {
// check content-encoding
if self.should_decompress {
if let Some(enc) = resp.headers().get(CONTENT_ENCODING) {
if let Ok(enc) = enc.to_str() {
match ContentEncoding::from(enc) {
ContentEncoding::Auto | ContentEncoding::Identity => (),
enc => self.decompress = Some(PayloadStream::new(enc)),
fn parse(&mut self) -> Poll<ClientResponse, HttpResponseParserError> {
if let Some(ref mut conn) = self.conn {
match self.parser.as_mut().unwrap().parse(conn, &mut self.parser_buf) {
Ok(Async::Ready(resp)) => {
// check content-encoding
if self.should_decompress {
if let Some(enc) = resp.headers().get(CONTENT_ENCODING) {
if let Ok(enc) = enc.to_str() {
match ContentEncoding::from(enc) {
ContentEncoding::Auto | ContentEncoding::Identity => (),
enc => self.decompress = Some(PayloadStream::new(enc)),
}
}
}
}
}
Ok(Async::Ready(resp))
Ok(Async::Ready(resp))
}
val => val,
}
val => val,
} else {
Ok(Async::NotReady)
}
}
#[inline]
pub fn poll(&mut self) -> Poll<Option<Bytes>, PayloadError> {
if self.conn.is_none() {
return Ok(Async::Ready(None))
}
let conn: &mut Connection = unsafe{ mem::transmute(self.conn.as_mut().unwrap())};
let mut need_run = false;
// need write?
@@ -286,7 +302,7 @@ impl Pipeline {
if self.parser.is_some() {
loop {
match self.parser.as_mut().unwrap()
.parse_payload(&mut self.conn, &mut self.parser_buf)?
.parse_payload(conn, &mut self.parser_buf)?
{
Async::Ready(Some(b)) => {
if let Some(ref mut decompress) = self.decompress {
@@ -314,6 +330,7 @@ impl Pipeline {
if let Some(mut decompress) = self.decompress.take() {
let res = decompress.feed_eof();
if let Some(b) = res? {
self.release_conn();
return Ok(Async::Ready(Some(b)))
}
}
@@ -321,13 +338,14 @@ impl Pipeline {
if need_run {
Ok(Async::NotReady)
} else {
self.release_conn();
Ok(Async::Ready(None))
}
}
#[inline]
pub fn poll_write(&mut self) -> Poll<(), Error> {
if self.write_state == RunningState::Done {
fn poll_write(&mut self) -> Poll<(), Error> {
if self.write_state == RunningState::Done || self.conn.is_none() {
return Ok(Async::Ready(()))
}
@@ -416,7 +434,7 @@ impl Pipeline {
}
// flush io but only if we need to
match self.writer.poll_completed(&mut self.conn, false) {
match self.writer.poll_completed(self.conn.as_mut().unwrap(), false) {
Ok(Async::Ready(_)) => {
if self.disconnected {
self.write_state = RunningState::Done;

View File

@@ -1,6 +1,7 @@
use std::{fmt, mem};
use std::fmt::Write as FmtWrite;
use std::io::Write;
use std::time::Duration;
use actix::{Addr, Unsync};
use cookie::{Cookie, CookieJar};
@@ -26,6 +27,7 @@ pub struct ClientRequest {
body: Body,
chunked: bool,
upgrade: bool,
timeout: Option<Duration>,
encoding: ContentEncoding,
response_decompress: bool,
buffer_capacity: usize,
@@ -49,6 +51,7 @@ impl Default for ClientRequest {
body: Body::Empty,
chunked: false,
upgrade: false,
timeout: None,
encoding: ContentEncoding::Auto,
response_decompress: true,
buffer_capacity: 32_768,
@@ -204,10 +207,16 @@ impl ClientRequest {
///
/// This method returns future that resolves to a ClientResponse
pub fn send(mut self) -> SendRequest {
match mem::replace(&mut self.conn, ConnectionType::Default) {
let timeout = self.timeout.take();
let send = match mem::replace(&mut self.conn, ConnectionType::Default) {
ConnectionType::Default => SendRequest::new(self),
ConnectionType::Connector(conn) => SendRequest::with_connector(self, conn),
ConnectionType::Connection(conn) => SendRequest::with_connection(self, conn),
};
if let Some(timeout) = timeout {
send.timeout(timeout)
} else {
send
}
}
}
@@ -224,7 +233,6 @@ impl fmt::Debug for ClientRequest {
}
}
/// An HTTP Client request builder
///
/// This type can be used to construct an instance of `ClientRequest` through a
@@ -476,6 +484,17 @@ impl ClientRequestBuilder {
self
}
/// Set request timeout
///
/// Request timeout is a total time before response should be received.
/// Default value is 5 seconds.
pub fn timeout(&mut self, timeout: Duration) -> &mut Self {
if let Some(parts) = parts(&mut self.request, &self.err) {
parts.timeout = Some(timeout);
}
self
}
/// Send request using custom connector
pub fn with_connector(&mut self, conn: Addr<Unsync, ClientConnector>) -> &mut Self {
if let Some(parts) = parts(&mut self.request, &self.err) {
@@ -599,3 +618,19 @@ fn parts<'a>(parts: &'a mut Option<ClientRequest>, err: &Option<HttpError>)
}
parts.as_mut()
}
impl fmt::Debug for ClientRequestBuilder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if let Some(ref parts) = self.request {
let res = write!(f, "\nClientRequestBuilder {:?} {}:{}\n",
parts.version, parts.method, parts.uri);
let _ = write!(f, " headers:\n");
for (key, val) in parts.headers.iter() {
let _ = write!(f, " {:?}: {:?}\n", key, val);
}
res
} else {
write!(f, "ClientRequestBuilder(Consumed)")
}
}
}

View File

@@ -13,10 +13,11 @@ use http::header::{HeaderValue, DATE,
CONNECTION, CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING};
use flate2::Compression;
use flate2::write::{GzEncoder, DeflateEncoder};
#[cfg(feature="brotli")]
use brotli2::write::BrotliEncoder;
use body::{Body, Binary};
use headers::ContentEncoding;
use header::ContentEncoding;
use server::WriterState;
use server::shared::SharedBytes;
use server::encoding::{ContentEncoder, TransferEncoding};
@@ -112,7 +113,9 @@ impl HttpClientWriter {
// status line
let _ = write!(buffer, "{} {} {:?}\r\n",
msg.method(), msg.uri().path(), msg.version());
msg.method(),
msg.uri().path_and_query().map(|u| u.as_str()).unwrap_or("/"),
msg.version());
// write headers
for (key, value) in msg.headers() {
@@ -213,6 +216,7 @@ fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> ContentEncoder
DeflateEncoder::new(transfer, Compression::default())),
ContentEncoding::Gzip => ContentEncoder::Gzip(
GzEncoder::new(transfer, Compression::default())),
#[cfg(feature="brotli")]
ContentEncoding::Br => ContentEncoder::Br(
BrotliEncoder::new(transfer, 5)),
ContentEncoding::Identity => ContentEncoder::Identity(transfer),
@@ -262,6 +266,7 @@ fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> ContentEncoder
DeflateEncoder::new(transfer, Compression::default())),
ContentEncoding::Gzip => ContentEncoder::Gzip(
GzEncoder::new(transfer, Compression::default())),
#[cfg(feature="brotli")]
ContentEncoding::Br => ContentEncoder::Br(
BrotliEncoder::new(transfer, 5)),
ContentEncoding::Identity | ContentEncoding::Auto => ContentEncoder::Identity(transfer),

View File

@@ -191,7 +191,7 @@ impl<A, S> ActorHttpContext for HttpContext<A, S> where A: Actor<Context=Self>,
if self.inner.alive() {
match self.inner.poll(ctx) {
Ok(Async::NotReady) | Ok(Async::Ready(())) => (),
Err(_) => return Err(ErrorInternalServerError("error").into()),
Err(_) => return Err(ErrorInternalServerError("error")),
}
}

View File

@@ -575,68 +575,91 @@ impl<T> Responder for InternalError<T>
/// Helper function that creates wrapper of any error and generate *BAD REQUEST* response.
#[allow(non_snake_case)]
pub fn ErrorBadRequest<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::BAD_REQUEST)
pub fn ErrorBadRequest<T>(err: T) -> Error
where T: Send + Sync + fmt::Debug + 'static
{
InternalError::new(err, StatusCode::BAD_REQUEST).into()
}
/// Helper function that creates wrapper of any error and generate *UNAUTHORIZED* response.
#[allow(non_snake_case)]
pub fn ErrorUnauthorized<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::UNAUTHORIZED)
pub fn ErrorUnauthorized<T>(err: T) -> Error
where T: Send + Sync + fmt::Debug + 'static
{
InternalError::new(err, StatusCode::UNAUTHORIZED).into()
}
/// Helper function that creates wrapper of any error and generate *FORBIDDEN* response.
#[allow(non_snake_case)]
pub fn ErrorForbidden<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::FORBIDDEN)
pub fn ErrorForbidden<T>(err: T) -> Error
where T: Send + Sync + fmt::Debug + 'static
{
InternalError::new(err, StatusCode::FORBIDDEN).into()
}
/// Helper function that creates wrapper of any error and generate *NOT FOUND* response.
#[allow(non_snake_case)]
pub fn ErrorNotFound<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::NOT_FOUND)
pub fn ErrorNotFound<T>(err: T) -> Error
where T: Send + Sync + fmt::Debug + 'static
{
InternalError::new(err, StatusCode::NOT_FOUND).into()
}
/// Helper function that creates wrapper of any error and generate *METHOD NOT ALLOWED* response.
#[allow(non_snake_case)]
pub fn ErrorMethodNotAllowed<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::METHOD_NOT_ALLOWED)
pub fn ErrorMethodNotAllowed<T>(err: T) -> Error
where T: Send + Sync + fmt::Debug + 'static
{
InternalError::new(err, StatusCode::METHOD_NOT_ALLOWED).into()
}
/// Helper function that creates wrapper of any error and generate *REQUEST TIMEOUT* response.
#[allow(non_snake_case)]
pub fn ErrorRequestTimeout<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::REQUEST_TIMEOUT)
pub fn ErrorRequestTimeout<T>(err: T) -> Error
where T: Send + Sync + fmt::Debug + 'static
{
InternalError::new(err, StatusCode::REQUEST_TIMEOUT).into()
}
/// Helper function that creates wrapper of any error and generate *CONFLICT* response.
#[allow(non_snake_case)]
pub fn ErrorConflict<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::CONFLICT)
pub fn ErrorConflict<T>(err: T) -> Error
where T: Send + Sync + fmt::Debug + 'static
{
InternalError::new(err, StatusCode::CONFLICT).into()
}
/// Helper function that creates wrapper of any error and generate *GONE* response.
#[allow(non_snake_case)]
pub fn ErrorGone<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::GONE)
pub fn ErrorGone<T>(err: T) -> Error
where T: Send + Sync + fmt::Debug + 'static
{
InternalError::new(err, StatusCode::GONE).into()
}
/// Helper function that creates wrapper of any error and generate *PRECONDITION FAILED* response.
#[allow(non_snake_case)]
pub fn ErrorPreconditionFailed<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::PRECONDITION_FAILED)
pub fn ErrorPreconditionFailed<T>(err: T) -> Error
where T: Send + Sync + fmt::Debug + 'static
{
InternalError::new(err, StatusCode::PRECONDITION_FAILED).into()
}
/// Helper function that creates wrapper of any error and generate *EXPECTATION FAILED* response.
#[allow(non_snake_case)]
pub fn ErrorExpectationFailed<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::EXPECTATION_FAILED)
pub fn ErrorExpectationFailed<T>(err: T) -> Error
where T: Send + Sync + fmt::Debug + 'static
{
InternalError::new(err, StatusCode::EXPECTATION_FAILED).into()
}
/// Helper function that creates wrapper of any error and generate *INTERNAL SERVER ERROR* response.
/// Helper function that creates wrapper of any error and
/// generate *INTERNAL SERVER ERROR* response.
#[allow(non_snake_case)]
pub fn ErrorInternalServerError<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::INTERNAL_SERVER_ERROR)
pub fn ErrorInternalServerError<T>(err: T) -> Error
where T: Send + Sync + fmt::Debug + 'static
{
InternalError::new(err, StatusCode::INTERNAL_SERVER_ERROR).into()
}
#[cfg(test)]

118
src/fs.rs
View File

@@ -21,11 +21,11 @@ use mime_guess::get_mime_type;
use header;
use error::Error;
use param::FromParam;
use handler::{Handler, Responder};
use handler::{Handler, RouteHandler, WrapHandler, Responder, Reply};
use httpmessage::HttpMessage;
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
use httpcodes::{HttpOk, HttpFound, HttpMethodNotAllowed};
use httpcodes::{HttpOk, HttpFound, HttpNotFound, HttpMethodNotAllowed};
/// A file with an associated name; responds with the Content-Type based on the
/// file extension.
@@ -36,6 +36,7 @@ pub struct NamedFile {
md: Metadata,
modified: Option<SystemTime>,
cpu_pool: Option<CpuPool>,
only_get: bool,
}
impl NamedFile {
@@ -54,7 +55,14 @@ impl NamedFile {
let path = path.as_ref().to_path_buf();
let modified = md.modified().ok();
let cpu_pool = None;
Ok(NamedFile{path, file, md, modified, cpu_pool})
Ok(NamedFile{path, file, md, modified, cpu_pool, only_get: false})
}
/// Allow only GET and HEAD methods
#[inline]
pub fn only_get(mut self) -> Self {
self.only_get = true;
self
}
/// Returns reference to the underlying `File` object.
@@ -168,7 +176,7 @@ impl Responder for NamedFile {
type Error = io::Error;
fn respond_to(self, req: HttpRequest) -> Result<HttpResponse, io::Error> {
if *req.method() != Method::GET && *req.method() != Method::HEAD {
if self.only_get && *req.method() != Method::GET && *req.method() != Method::HEAD {
return Ok(HttpMethodNotAllowed.build()
.header(header::http::CONTENT_TYPE, "text/plain")
.header(header::http::ALLOW, "GET, HEAD")
@@ -215,7 +223,9 @@ impl Responder for NamedFile {
return Ok(resp.status(StatusCode::NOT_MODIFIED).finish().unwrap())
}
if *req.method() == Method::GET {
if *req.method() == Method::HEAD {
Ok(resp.finish().unwrap())
} else {
let reader = ChunkedReadFile {
size: self.md.len(),
offset: 0,
@@ -224,8 +234,6 @@ impl Responder for NamedFile {
fut: None,
};
Ok(resp.streaming(reader).unwrap())
} else {
Ok(resp.finish().unwrap())
}
}
}
@@ -354,27 +362,6 @@ impl Responder for Directory {
}
}
/// This enum represents all filesystem elements.
pub enum FilesystemElement {
File(NamedFile),
Directory(Directory),
Redirect(HttpResponse),
}
impl Responder for FilesystemElement {
type Item = HttpResponse;
type Error = io::Error;
fn respond_to(self, req: HttpRequest) -> Result<HttpResponse, io::Error> {
match self {
FilesystemElement::File(file) => file.respond_to(req),
FilesystemElement::Directory(dir) => dir.respond_to(req),
FilesystemElement::Redirect(resp) => Ok(resp),
}
}
}
/// Static files handling
///
/// `StaticFile` handler must be registered with `Application::handler()` method,
@@ -390,23 +377,24 @@ impl Responder for FilesystemElement {
/// .finish();
/// }
/// ```
pub struct StaticFiles {
pub struct StaticFiles<S> {
directory: PathBuf,
accessible: bool,
index: Option<String>,
show_index: bool,
cpu_pool: CpuPool,
default: Box<RouteHandler<S>>,
_chunk_size: usize,
_follow_symlinks: bool,
}
impl StaticFiles {
impl<S: 'static> StaticFiles<S> {
/// Create new `StaticFiles` instance
///
/// `dir` - base directory
///
/// `index` - show index for directory
pub fn new<T: Into<PathBuf>>(dir: T, index: bool) -> StaticFiles {
pub fn new<T: Into<PathBuf>>(dir: T, index: bool) -> StaticFiles<S> {
let dir = dir.into();
let (dir, access) = match dir.canonicalize() {
@@ -430,6 +418,7 @@ impl StaticFiles {
index: None,
show_index: index,
cpu_pool: CpuPool::new(40),
default: Box::new(WrapHandler::new(|_| HttpNotFound)),
_chunk_size: 0,
_follow_symlinks: false,
}
@@ -439,28 +428,30 @@ impl StaticFiles {
///
/// Redirects to specific index file for directory "/" instead of
/// showing files listing.
pub fn index_file<T: Into<String>>(mut self, index: T) -> StaticFiles {
pub fn index_file<T: Into<String>>(mut self, index: T) -> StaticFiles<S> {
self.index = Some(index.into());
self
}
/// Sets default resource which is used when no matched file could be found.
pub fn default_handler<H: Handler<S>>(mut self, handler: H) -> StaticFiles<S> {
self.default = Box::new(WrapHandler::new(handler));
self
}
}
impl<S> Handler<S> for StaticFiles {
type Result = Result<FilesystemElement, io::Error>;
impl<S: 'static> Handler<S> for StaticFiles<S> {
type Result = Result<Reply, Error>;
fn handle(&mut self, req: HttpRequest<S>) -> Self::Result {
if !self.accessible {
Err(io::Error::new(io::ErrorKind::NotFound, "not found"))
Ok(self.default.handle(req))
} else {
let path = if let Some(path) = req.match_info().get("tail") {
path
} else {
return Err(io::Error::new(io::ErrorKind::NotFound, "not found"))
let relpath = match req.match_info().get("tail").map(PathBuf::from_param) {
Some(Ok(path)) => path,
_ => return Ok(self.default.handle(req))
};
let relpath = PathBuf::from_param(path)
.map_err(|_| io::Error::new(io::ErrorKind::NotFound, "not found"))?;
// full filepath
let path = self.directory.join(&relpath).canonicalize()?;
@@ -474,20 +465,21 @@ impl<S> Handler<S> for StaticFiles {
new_path.push('/');
}
new_path.push_str(redir_index);
Ok(FilesystemElement::Redirect(
HttpFound
.build()
.header(header::http::LOCATION, new_path.as_str())
.finish().unwrap()))
HttpFound.build()
.header(header::http::LOCATION, new_path.as_str())
.finish().unwrap()
.respond_to(req.without_state())
} else if self.show_index {
Ok(FilesystemElement::Directory(
Directory::new(self.directory.clone(), path)))
Directory::new(self.directory.clone(), path)
.respond_to(req.without_state())?
.respond_to(req.without_state())
} else {
Err(io::Error::new(io::ErrorKind::NotFound, "not found"))
Ok(self.default.handle(req))
}
} else {
Ok(FilesystemElement::File(
NamedFile::open(path)?.set_cpu_pool(self.cpu_pool.clone())))
NamedFile::open(path)?.set_cpu_pool(self.cpu_pool.clone())
.respond_to(req.without_state())?
.respond_to(req.without_state())
}
}
}
@@ -517,25 +509,38 @@ mod tests {
let req = TestRequest::default().method(Method::POST).finish();
let file = NamedFile::open("Cargo.toml").unwrap();
let resp = file.respond_to(req).unwrap();
let resp = file.only_get().respond_to(req).unwrap();
assert_eq!(resp.status(), StatusCode::METHOD_NOT_ALLOWED);
}
#[test]
fn test_named_file_any_method() {
let req = TestRequest::default().method(Method::POST).finish();
let file = NamedFile::open("Cargo.toml").unwrap();
let resp = file.respond_to(req).unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}
#[test]
fn test_static_files() {
let mut st = StaticFiles::new(".", true);
st.accessible = false;
assert!(st.handle(HttpRequest::default()).is_err());
let resp = st.handle(HttpRequest::default()).respond_to(HttpRequest::default()).unwrap();
let resp = resp.as_response().expect("HTTP Response");
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
st.accessible = true;
st.show_index = false;
assert!(st.handle(HttpRequest::default()).is_err());
let resp = st.handle(HttpRequest::default()).respond_to(HttpRequest::default()).unwrap();
let resp = resp.as_response().expect("HTTP Response");
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
let mut req = HttpRequest::default();
req.match_info_mut().add("tail", "");
st.show_index = true;
let resp = st.handle(req).respond_to(HttpRequest::default()).unwrap();
let resp = resp.as_response().expect("HTTP Response");
assert_eq!(resp.headers().get(header::CONTENT_TYPE).unwrap(), "text/html; charset=utf-8");
assert!(resp.body().is_binary());
assert!(format!("{:?}", resp.body()).contains("README.md"));
@@ -548,6 +553,7 @@ mod tests {
req.match_info_mut().add("tail", "guide");
let resp = st.handle(req).respond_to(HttpRequest::default()).unwrap();
let resp = resp.as_response().expect("HTTP Response");
assert_eq!(resp.status(), StatusCode::FOUND);
assert_eq!(resp.headers().get(header::LOCATION).unwrap(), "/guide/index.html");
@@ -555,6 +561,7 @@ mod tests {
req.match_info_mut().add("tail", "guide/");
let resp = st.handle(req).respond_to(HttpRequest::default()).unwrap();
let resp = resp.as_response().expect("HTTP Response");
assert_eq!(resp.status(), StatusCode::FOUND);
assert_eq!(resp.headers().get(header::LOCATION).unwrap(), "/guide/index.html");
}
@@ -566,6 +573,7 @@ mod tests {
req.match_info_mut().add("tail", "examples/basics");
let resp = st.handle(req).respond_to(HttpRequest::default()).unwrap();
let resp = resp.as_response().expect("HTTP Response");
assert_eq!(resp.status(), StatusCode::FOUND);
assert_eq!(resp.headers().get(header::LOCATION).unwrap(), "/examples/basics/Cargo.toml");
}

View File

@@ -119,6 +119,7 @@ pub enum ContentEncoding {
/// Automatically select encoding based on encoding negotiation
Auto,
/// A format using the Brotli algorithm
#[cfg(feature="brotli")]
Br,
/// A format using the zlib structure with deflate algorithm
Deflate,
@@ -141,15 +142,19 @@ impl ContentEncoding {
#[inline]
pub fn as_str(&self) -> &'static str {
match *self {
#[cfg(feature="brotli")]
ContentEncoding::Br => "br",
ContentEncoding::Gzip => "gzip",
ContentEncoding::Deflate => "deflate",
ContentEncoding::Identity | ContentEncoding::Auto => "identity",
}
}
#[inline]
/// default quality value
pub fn quality(&self) -> f64 {
match *self {
#[cfg(feature="brotli")]
ContentEncoding::Br => 1.1,
ContentEncoding::Gzip => 1.0,
ContentEncoding::Deflate => 0.9,
@@ -162,6 +167,7 @@ impl ContentEncoding {
impl<'a> From<&'a str> for ContentEncoding {
fn from(s: &'a str) -> ContentEncoding {
match s.trim().to_lowercase().as_ref() {
#[cfg(feature="brotli")]
"br" => ContentEncoding::Br,
"gzip" => ContentEncoding::Gzip,
"deflate" => ContentEncoding::Deflate,

View File

@@ -1,71 +1,13 @@
use std::{str, mem, ptr, slice};
use std::{mem, ptr, slice};
use std::cell::RefCell;
use std::fmt::{self, Write};
use std::rc::Rc;
use std::ops::{Deref, DerefMut};
use std::collections::VecDeque;
use time;
use bytes::{BufMut, BytesMut};
use http::Version;
use httprequest::HttpInnerMessage;
// "Sun, 06 Nov 1994 08:49:37 GMT".len()
pub(crate) const DATE_VALUE_LENGTH: usize = 29;
pub(crate) fn date(dst: &mut BytesMut) {
CACHED.with(|cache| {
let mut buf: [u8; 39] = unsafe { mem::uninitialized() };
buf[..6].copy_from_slice(b"date: ");
buf[6..35].copy_from_slice(cache.borrow().buffer());
buf[35..].copy_from_slice(b"\r\n\r\n");
dst.extend_from_slice(&buf);
})
}
pub(crate) fn date_value(dst: &mut BytesMut) {
CACHED.with(|cache| {
dst.extend_from_slice(cache.borrow().buffer());
})
}
pub(crate) fn update_date() {
CACHED.with(|cache| {
cache.borrow_mut().update();
});
}
struct CachedDate {
bytes: [u8; DATE_VALUE_LENGTH],
pos: usize,
}
thread_local!(static CACHED: RefCell<CachedDate> = RefCell::new(CachedDate {
bytes: [0; DATE_VALUE_LENGTH],
pos: 0,
}));
impl CachedDate {
fn buffer(&self) -> &[u8] {
&self.bytes[..]
}
fn update(&mut self) {
self.pos = 0;
write!(self, "{}", time::at_utc(time::get_time()).rfc822()).unwrap();
assert_eq!(self.pos, DATE_VALUE_LENGTH);
}
}
impl fmt::Write for CachedDate {
fn write_str(&mut self, s: &str) -> fmt::Result {
let len = s.len();
self.bytes[self.pos..self.pos + len].copy_from_slice(s.as_bytes());
self.pos += len;
Ok(())
}
}
/// Internal use only! unsafe
pub(crate) struct SharedMessagePool(RefCell<VecDeque<Rc<HttpInnerMessage>>>);
@@ -202,7 +144,7 @@ pub(crate) fn write_status_line(version: Version, mut n: u16, bytes: &mut BytesM
}
}
bytes.extend_from_slice(&buf);
bytes.put_slice(&buf);
if four {
bytes.put(b' ');
}
@@ -214,7 +156,7 @@ pub(crate) fn write_content_length(mut n: usize, bytes: &mut BytesMut) {
b'n',b't',b'-',b'l',b'e',b'n',b'g',
b't',b'h',b':',b' ',b'0',b'\r',b'\n'];
buf[18] = (n as u8) + b'0';
bytes.extend_from_slice(&buf);
bytes.put_slice(&buf);
} else if n < 100 {
let mut buf: [u8; 22] = [b'\r',b'\n',b'c',b'o',b'n',b't',b'e',
b'n',b't',b'-',b'l',b'e',b'n',b'g',
@@ -224,7 +166,7 @@ pub(crate) fn write_content_length(mut n: usize, bytes: &mut BytesMut) {
ptr::copy_nonoverlapping(
DEC_DIGITS_LUT.as_ptr().offset(d1 as isize), buf.as_mut_ptr().offset(18), 2);
}
bytes.extend_from_slice(&buf);
bytes.put_slice(&buf);
} else if n < 1000 {
let mut buf: [u8; 23] = [b'\r',b'\n',b'c',b'o',b'n',b't',b'e',
b'n',b't',b'-',b'l',b'e',b'n',b'g',
@@ -238,9 +180,9 @@ pub(crate) fn write_content_length(mut n: usize, bytes: &mut BytesMut) {
// decode last 1
buf[18] = (n as u8) + b'0';
bytes.extend_from_slice(&buf);
bytes.put_slice(&buf);
} else {
bytes.extend_from_slice(b"\r\ncontent-length: ");
bytes.put_slice(b"\r\ncontent-length: ");
convert_usize(n, bytes);
}
}
@@ -299,20 +241,6 @@ pub(crate) fn convert_usize(mut n: usize, bytes: &mut BytesMut) {
mod tests {
use super::*;
#[test]
fn test_date_len() {
assert_eq!(DATE_VALUE_LENGTH, "Sun, 06 Nov 1994 08:49:37 GMT".len());
}
#[test]
fn test_date() {
let mut buf1 = BytesMut::new();
date(&mut buf1);
let mut buf2 = BytesMut::new();
date(&mut buf2);
assert_eq!(buf1, buf2);
}
#[test]
fn test_write_content_length() {
let mut bytes = BytesMut::new();

View File

@@ -203,11 +203,10 @@ impl<S> HttpRequest<S> {
#[inline]
pub fn uri(&self) -> &Uri { &self.as_ref().uri }
#[doc(hidden)]
#[inline]
/// Modify the Request Uri.
/// Returns mutable the Request Uri.
///
/// This might be useful for middlewares, i.e. path normalization
/// This might be useful for middlewares, e.g. path normalization.
#[inline]
pub fn uri_mut(&mut self) -> &mut Uri {
&mut self.as_mut().uri
}
@@ -222,7 +221,9 @@ impl<S> HttpRequest<S> {
self.as_ref().version
}
#[doc(hidden)]
///Returns mutable Request's headers.
///
///This is intended to be used by middleware.
#[inline]
pub fn headers_mut(&mut self) -> &mut HeaderMap {
&mut self.as_mut().headers
@@ -389,6 +390,15 @@ impl<S> HttpRequest<S> {
self.as_ref().method == Method::CONNECT
}
/// Set read buffer capacity
///
/// Default buffer capacity is 32Kb.
pub fn set_read_buffer_capacity(&mut self, cap: usize) {
if let Some(ref mut payload) = self.as_mut().payload {
payload.set_read_buffer_capacity(cap)
}
}
#[cfg(test)]
pub(crate) fn payload(&self) -> &Payload {
let msg = self.as_mut();

View File

@@ -1,7 +1,8 @@
//! Http response
use std::{mem, str, fmt};
use std::rc::Rc;
use std::io::Write;
use std::cell::RefCell;
use std::cell::UnsafeCell;
use std::collections::VecDeque;
use cookie::{Cookie, CookieJar};
@@ -34,12 +35,12 @@ pub enum ConnectionType {
}
/// An HTTP Response
pub struct HttpResponse(Option<Box<InnerHttpResponse>>);
pub struct HttpResponse(Option<Box<InnerHttpResponse>>, Rc<UnsafeCell<Pool>>);
impl Drop for HttpResponse {
fn drop(&mut self) {
if let Some(inner) = self.0.take() {
Pool::release(inner)
Pool::release(&self.1, inner)
}
}
}
@@ -61,8 +62,10 @@ impl HttpResponse {
/// Create http response builder with specific status.
#[inline]
pub fn build(status: StatusCode) -> HttpResponseBuilder {
let (msg, pool) = Pool::get(status);
HttpResponseBuilder {
response: Some(Pool::get(status)),
response: Some(msg),
pool: Some(pool),
err: None,
cookies: None,
}
@@ -71,7 +74,8 @@ impl HttpResponse {
/// Constructs a response
#[inline]
pub fn new(status: StatusCode, body: Body) -> HttpResponse {
HttpResponse(Some(Pool::with_body(status, body)))
let (msg, pool) = Pool::with_body(status, body);
HttpResponse(Some(msg), pool)
}
/// Constructs a error response
@@ -232,9 +236,9 @@ impl fmt::Debug for HttpResponse {
///
/// This type can be used to construct an instance of `HttpResponse` through a
/// builder-like pattern.
#[derive(Debug)]
pub struct HttpResponseBuilder {
response: Option<Box<InnerHttpResponse>>,
pool: Option<Rc<UnsafeCell<Pool>>>,
err: Option<HttpError>,
cookies: Option<CookieJar>,
}
@@ -506,7 +510,7 @@ impl HttpResponseBuilder {
}
}
response.body = body.into();
Ok(HttpResponse(Some(response)))
Ok(HttpResponse(Some(response), self.pool.take().unwrap()))
}
/// Set a streaming body and generate `HttpResponse`.
@@ -547,6 +551,7 @@ impl HttpResponseBuilder {
pub fn take(&mut self) -> HttpResponseBuilder {
HttpResponseBuilder {
response: self.response.take(),
pool: self.pool.take(),
err: self.err.take(),
cookies: self.cookies.take(),
}
@@ -748,55 +753,56 @@ impl InnerHttpResponse {
/// Internal use only! unsafe
struct Pool(VecDeque<Box<InnerHttpResponse>>);
thread_local!(static POOL: RefCell<Pool> =
RefCell::new(Pool(VecDeque::with_capacity(128))));
thread_local!(static POOL: Rc<UnsafeCell<Pool>> =
Rc::new(UnsafeCell::new(Pool(VecDeque::with_capacity(128)))));
impl Pool {
#[inline]
fn get(status: StatusCode) -> Box<InnerHttpResponse> {
fn get(status: StatusCode) -> (Box<InnerHttpResponse>, Rc<UnsafeCell<Pool>>) {
POOL.with(|pool| {
if let Some(mut resp) = pool.borrow_mut().0.pop_front() {
let p = unsafe{&mut *pool.as_ref().get()};
if let Some(mut resp) = p.0.pop_front() {
resp.body = Body::Empty;
resp.status = status;
resp
(resp, Rc::clone(pool))
} else {
Box::new(InnerHttpResponse::new(status, Body::Empty))
(Box::new(InnerHttpResponse::new(status, Body::Empty)), Rc::clone(pool))
}
})
}
#[inline]
fn with_body(status: StatusCode, body: Body) -> Box<InnerHttpResponse> {
fn with_body(status: StatusCode, body: Body)
-> (Box<InnerHttpResponse>, Rc<UnsafeCell<Pool>>) {
POOL.with(|pool| {
if let Some(mut resp) = pool.borrow_mut().0.pop_front() {
let p = unsafe{&mut *pool.as_ref().get()};
if let Some(mut resp) = p.0.pop_front() {
resp.status = status;
resp.body = body;
resp
(resp, Rc::clone(pool))
} else {
Box::new(InnerHttpResponse::new(status, body))
(Box::new(InnerHttpResponse::new(status, body)), Rc::clone(pool))
}
})
}
#[inline(always)]
#[cfg_attr(feature = "cargo-clippy", allow(boxed_local, inline_always))]
fn release(mut inner: Box<InnerHttpResponse>) {
POOL.with(|pool| {
let v = &mut pool.borrow_mut().0;
if v.len() < 128 {
inner.headers.clear();
inner.version = None;
inner.chunked = None;
inner.reason = None;
inner.encoding = None;
inner.connection_type = None;
inner.response_size = 0;
inner.error = None;
inner.write_capacity = MAX_WRITE_BUFFER_SIZE;
v.push_front(inner);
}
})
fn release(pool: &Rc<UnsafeCell<Pool>>, mut inner: Box<InnerHttpResponse>) {
let pool = unsafe{&mut *pool.as_ref().get()};
if pool.0.len() < 128 {
inner.headers.clear();
inner.version = None;
inner.chunked = None;
inner.reason = None;
inner.encoding = None;
inner.connection_type = None;
inner.response_size = 0;
inner.error = None;
inner.write_capacity = MAX_WRITE_BUFFER_SIZE;
pool.0.push_front(inner);
}
}
}

View File

@@ -79,6 +79,7 @@ extern crate libc;
extern crate serde;
extern crate serde_json;
extern crate flate2;
#[cfg(feature="brotli")]
extern crate brotli2;
extern crate encoding;
extern crate percent_encoding;

View File

@@ -4,9 +4,10 @@ use std::path::PathBuf;
use std::str::FromStr;
use std::slice::Iter;
use std::borrow::Cow;
use http::StatusCode;
use smallvec::SmallVec;
use error::{ResponseError, UriSegmentError, InternalError, ErrorBadRequest};
use error::{ResponseError, UriSegmentError, InternalError};
/// A trait to abstract the idea of creating a new instance of a type from a path parameter.
@@ -144,7 +145,8 @@ macro_rules! FROM_STR {
type Err = InternalError<<$type as FromStr>::Err>;
fn from_param(val: &str) -> Result<Self, Self::Err> {
<$type as FromStr>::from_str(val).map_err(ErrorBadRequest)
<$type as FromStr>::from_str(val)
.map_err(|e| InternalError::new(e, StatusCode::BAD_REQUEST))
}
}
}

View File

@@ -5,9 +5,14 @@ use std::cell::RefCell;
use std::collections::VecDeque;
use bytes::{Bytes, BytesMut};
use futures::{Async, Poll, Stream};
use futures::task::{Task, current as current_task};
use error::PayloadError;
/// max buffer size 32k
pub(crate) const MAX_BUFFER_SIZE: usize = 32_768;
#[derive(Debug, PartialEq)]
pub(crate) enum PayloadStatus {
Read,
@@ -76,6 +81,14 @@ impl Payload {
pub(crate) fn readall(&self) -> Option<Bytes> {
self.inner.borrow_mut().readall()
}
#[inline]
/// Set read buffer capacity
///
/// Default buffer capacity is 32Kb.
pub fn set_read_buffer_capacity(&mut self, cap: usize) {
self.inner.borrow_mut().capacity = cap;
}
}
impl Stream for Payload {
@@ -146,6 +159,12 @@ impl PayloadWriter for PayloadSender {
if shared.borrow().need_read {
PayloadStatus::Read
} else {
#[cfg(not(test))]
{
if shared.borrow_mut().io_task.is_none() {
shared.borrow_mut().io_task = Some(current_task());
}
}
PayloadStatus::Pause
}
} else {
@@ -161,6 +180,9 @@ struct Inner {
err: Option<PayloadError>,
need_read: bool,
items: VecDeque<Bytes>,
capacity: usize,
task: Option<Task>,
io_task: Option<Task>,
}
impl Inner {
@@ -172,6 +194,9 @@ impl Inner {
err: None,
items: VecDeque::new(),
need_read: true,
capacity: MAX_BUFFER_SIZE,
task: None,
io_task: None,
}
}
@@ -188,8 +213,11 @@ impl Inner {
#[inline]
fn feed_data(&mut self, data: Bytes) {
self.len += data.len();
self.need_read = false;
self.items.push_back(data);
self.need_read = self.len < self.capacity;
if let Some(task) = self.task.take() {
task.notify()
}
}
#[inline]
@@ -222,6 +250,16 @@ impl Inner {
fn readany(&mut self) -> Poll<Option<Bytes>, PayloadError> {
if let Some(data) = self.items.pop_front() {
self.len -= data.len();
self.need_read = self.len < self.capacity;
#[cfg(not(test))]
{
if self.need_read && self.task.is_none() {
self.task = Some(current_task());
}
if let Some(task) = self.io_task.take() {
task.notify()
}
}
Ok(Async::Ready(Some(data)))
} else if let Some(err) = self.err.take() {
Err(err)
@@ -229,6 +267,15 @@ impl Inner {
Ok(Async::Ready(None))
} else {
self.need_read = true;
#[cfg(not(test))]
{
if self.task.is_none() {
self.task = Some(current_task());
}
if let Some(task) = self.io_task.take() {
task.notify()
}
}
Ok(Async::NotReady)
}
}

View File

@@ -453,163 +453,171 @@ impl<S: 'static, H> ProcessResponse<S, H> {
fn poll_io(mut self, io: &mut Writer, info: &mut PipelineInfo<S>)
-> Result<PipelineState<S, H>, PipelineState<S, H>>
{
if self.drain.is_none() && self.running != RunningState::Paused {
// if task is paused, write buffer is probably full
'outter: loop {
let result = match mem::replace(&mut self.iostate, IOState::Done) {
IOState::Response => {
let encoding = self.resp.content_encoding().unwrap_or(info.encoding);
loop {
if self.drain.is_none() && self.running != RunningState::Paused {
// if task is paused, write buffer is probably full
'inner: loop {
let result = match mem::replace(&mut self.iostate, IOState::Done) {
IOState::Response => {
let encoding = self.resp.content_encoding().unwrap_or(info.encoding);
let result = match io.start(info.req_mut().get_inner(),
&mut self.resp, encoding)
{
Ok(res) => res,
Err(err) => {
info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp))
}
};
if let Some(err) = self.resp.error() {
if self.resp.status().is_server_error() {
error!("Error occured during request handling: {}", err);
} else {
warn!("Error occured during request handling: {}", err);
}
if log_enabled!(Debug) {
debug!("{:?}", err);
}
}
match self.resp.replace_body(Body::Empty) {
Body::Streaming(stream) =>
self.iostate = IOState::Payload(stream),
Body::Actor(ctx) =>
self.iostate = IOState::Actor(ctx),
_ => (),
}
result
},
IOState::Payload(mut body) => {
match body.poll() {
Ok(Async::Ready(None)) => {
if let Err(err) = io.write_eof() {
let result = match io.start(info.req_mut().get_inner(),
&mut self.resp, encoding)
{
Ok(res) => res,
Err(err) => {
info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp))
}
break
};
if let Some(err) = self.resp.error() {
if self.resp.status().is_server_error() {
error!("Error occured during request handling: {}", err);
} else {
warn!("Error occured during request handling: {}", err);
}
if log_enabled!(Debug) {
debug!("{:?}", err);
}
}
// always poll stream or actor for the first time
match self.resp.replace_body(Body::Empty) {
Body::Streaming(stream) => {
self.iostate = IOState::Payload(stream);
continue 'inner
},
Body::Actor(ctx) => {
self.iostate = IOState::Actor(ctx);
continue 'inner
},
Ok(Async::Ready(Some(chunk))) => {
self.iostate = IOState::Payload(body);
match io.write(chunk.into()) {
Err(err) => {
_ => (),
}
result
},
IOState::Payload(mut body) => {
match body.poll() {
Ok(Async::Ready(None)) => {
if let Err(err) = io.write_eof() {
info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp))
},
Ok(result) => result
}
}
Ok(Async::NotReady) => {
self.iostate = IOState::Payload(body);
break
},
Err(err) => {
info.error = Some(err);
return Ok(FinishingMiddlewares::init(info, self.resp))
}
}
},
IOState::Actor(mut ctx) => {
if info.disconnected.take().is_some() {
ctx.disconnected();
}
match ctx.poll() {
Ok(Async::Ready(Some(vec))) => {
if vec.is_empty() {
self.iostate = IOState::Actor(ctx);
}
break
}
let mut res = None;
for frame in vec {
match frame {
Frame::Chunk(None) => {
info.context = Some(ctx);
if let Err(err) = io.write_eof() {
info.error = Some(err.into());
return Ok(
FinishingMiddlewares::init(info, self.resp))
}
break 'outter
},
Ok(Async::Ready(Some(chunk))) => {
self.iostate = IOState::Payload(body);
match io.write(chunk.into()) {
Err(err) => {
info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp))
},
Frame::Chunk(Some(chunk)) => {
match io.write(chunk) {
Err(err) => {
Ok(result) => result
}
}
Ok(Async::NotReady) => {
self.iostate = IOState::Payload(body);
break
},
Err(err) => {
info.error = Some(err);
return Ok(FinishingMiddlewares::init(info, self.resp))
}
}
},
IOState::Actor(mut ctx) => {
if info.disconnected.take().is_some() {
ctx.disconnected();
}
match ctx.poll() {
Ok(Async::Ready(Some(vec))) => {
if vec.is_empty() {
self.iostate = IOState::Actor(ctx);
break
}
let mut res = None;
for frame in vec {
match frame {
Frame::Chunk(None) => {
info.context = Some(ctx);
if let Err(err) = io.write_eof() {
info.error = Some(err.into());
return Ok(
FinishingMiddlewares::init(info, self.resp))
},
Ok(result) => res = Some(result),
}
},
Frame::Drain(fut) =>
self.drain = Some(fut),
}
break 'inner
},
Frame::Chunk(Some(chunk)) => {
match io.write(chunk) {
Err(err) => {
info.error = Some(err.into());
return Ok(
FinishingMiddlewares::init(info, self.resp))
},
Ok(result) => res = Some(result),
}
},
Frame::Drain(fut) => self.drain = Some(fut),
}
}
self.iostate = IOState::Actor(ctx);
if self.drain.is_some() {
self.running.resume();
break 'inner
}
res.unwrap()
},
Ok(Async::Ready(None)) => {
break
}
self.iostate = IOState::Actor(ctx);
if self.drain.is_some() {
self.running.resume();
break 'outter
Ok(Async::NotReady) => {
self.iostate = IOState::Actor(ctx);
break
}
Err(err) => {
info.error = Some(err);
return Ok(FinishingMiddlewares::init(info, self.resp))
}
res.unwrap()
},
Ok(Async::Ready(None)) => {
break
}
Ok(Async::NotReady) => {
self.iostate = IOState::Actor(ctx);
break
}
Err(err) => {
info.error = Some(err);
return Ok(FinishingMiddlewares::init(info, self.resp))
}
}
}
IOState::Done => break,
};
IOState::Done => break,
};
match result {
WriterState::Pause => {
self.running.pause();
break
match result {
WriterState::Pause => {
self.running.pause();
break
}
WriterState::Done => {
self.running.resume()
},
}
WriterState::Done => {
self.running.resume()
}
}
// flush io but only if we need to
if self.running == RunningState::Paused || self.drain.is_some() {
match io.poll_completed(false) {
Ok(Async::Ready(_)) => {
self.running.resume();
// resolve drain futures
if let Some(tx) = self.drain.take() {
let _ = tx.send(());
}
// restart io processing
continue
},
}
}
}
// flush io but only if we need to
if self.running == RunningState::Paused || self.drain.is_some() {
match io.poll_completed(false) {
Ok(Async::Ready(_)) => {
self.running.resume();
// resolve drain futures
if let Some(tx) = self.drain.take() {
let _ = tx.send(());
Ok(Async::NotReady) =>
return Err(PipelineState::Response(self)),
Err(err) => {
info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp))
}
// restart io processing
return self.poll_io(io, info);
},
Ok(Async::NotReady) => return Err(PipelineState::Response(self)),
Err(err) => {
info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp))
}
}
break
}
// response is completed

View File

@@ -3,6 +3,7 @@ use std::io::{Read, Write};
use std::fmt::Write as FmtWrite;
use std::str::FromStr;
use bytes::{Bytes, BytesMut, BufMut};
use http::{Version, Method, HttpTryFrom};
use http::header::{HeaderMap, HeaderValue,
ACCEPT_ENCODING, CONNECTION,
@@ -10,8 +11,8 @@ use http::header::{HeaderMap, HeaderValue,
use flate2::Compression;
use flate2::read::GzDecoder;
use flate2::write::{GzEncoder, DeflateDecoder, DeflateEncoder};
#[cfg(feature="brotli")]
use brotli2::write::{BrotliDecoder, BrotliEncoder};
use bytes::{Bytes, BytesMut, BufMut};
use header::ContentEncoding;
use body::{Body, Binary};
@@ -144,6 +145,7 @@ impl PayloadWriter for EncodedPayload {
pub(crate) enum Decoder {
Deflate(Box<DeflateDecoder<Writer>>),
Gzip(Option<Box<GzDecoder<Wrapper>>>),
#[cfg(feature="brotli")]
Br(Box<BrotliDecoder<Writer>>),
Identity,
}
@@ -214,6 +216,7 @@ pub(crate) struct PayloadStream {
impl PayloadStream {
pub fn new(enc: ContentEncoding) -> PayloadStream {
let dec = match enc {
#[cfg(feature="brotli")]
ContentEncoding::Br => Decoder::Br(
Box::new(BrotliDecoder::new(Writer::new()))),
ContentEncoding::Deflate => Decoder::Deflate(
@@ -229,6 +232,7 @@ impl PayloadStream {
pub fn feed_eof(&mut self) -> io::Result<Option<Bytes>> {
match self.decoder {
#[cfg(feature="brotli")]
Decoder::Br(ref mut decoder) => {
match decoder.finish() {
Ok(mut writer) => {
@@ -278,6 +282,7 @@ impl PayloadStream {
pub fn feed_data(&mut self, data: Bytes) -> io::Result<Option<Bytes>> {
match self.decoder {
#[cfg(feature="brotli")]
Decoder::Br(ref mut decoder) => {
match decoder.write_all(&data) {
Ok(_) => {
@@ -346,6 +351,7 @@ impl PayloadStream {
pub(crate) enum ContentEncoder {
Deflate(DeflateEncoder<TransferEncoding>),
Gzip(GzEncoder<TransferEncoding>),
#[cfg(feature="brotli")]
Br(BrotliEncoder<TransferEncoding>),
Identity(TransferEncoding),
}
@@ -362,6 +368,7 @@ impl ContentEncoder {
response_encoding: ContentEncoding) -> ContentEncoder
{
let version = resp.version().unwrap_or_else(|| req.version);
let is_head = req.method == Method::HEAD;
let mut body = resp.replace_body(Body::Empty);
let has_body = match body {
Body::Empty => false,
@@ -404,7 +411,9 @@ impl ContentEncoder {
TransferEncoding::length(0, buf)
},
Body::Binary(ref mut bytes) => {
if encoding.is_compression() {
if !(encoding == ContentEncoding::Identity
|| encoding == ContentEncoding::Auto)
{
let tmp = SharedBytes::default();
let transfer = TransferEncoding::eof(tmp.clone());
let mut enc = match encoding {
@@ -412,6 +421,7 @@ impl ContentEncoder {
DeflateEncoder::new(transfer, Compression::default())),
ContentEncoding::Gzip => ContentEncoder::Gzip(
GzEncoder::new(transfer, Compression::default())),
#[cfg(feature="brotli")]
ContentEncoding::Br => ContentEncoder::Br(
BrotliEncoder::new(transfer, 5)),
ContentEncoding::Identity => ContentEncoder::Identity(transfer),
@@ -424,13 +434,13 @@ impl ContentEncoder {
*bytes = Binary::from(tmp.take());
encoding = ContentEncoding::Identity;
}
if req.method == Method::HEAD {
if is_head {
let mut b = BytesMut::new();
let _ = write!(b, "{}", bytes.len());
resp.headers_mut().insert(
CONTENT_LENGTH, HeaderValue::try_from(b.freeze()).unwrap());
} else {
resp.headers_mut().remove(CONTENT_LENGTH);
// resp.headers_mut().remove(CONTENT_LENGTH);
}
TransferEncoding::eof(buf)
}
@@ -453,7 +463,7 @@ impl ContentEncoder {
}
};
//
if req.method == Method::HEAD {
if is_head {
transfer.kind = TransferEncodingKind::Length(0);
} else {
resp.replace_body(body);
@@ -464,6 +474,7 @@ impl ContentEncoder {
DeflateEncoder::new(transfer, Compression::default())),
ContentEncoding::Gzip => ContentEncoder::Gzip(
GzEncoder::new(transfer, Compression::default())),
#[cfg(feature="brotli")]
ContentEncoding::Br => ContentEncoder::Br(
BrotliEncoder::new(transfer, 5)),
ContentEncoding::Identity | ContentEncoding::Auto =>
@@ -538,6 +549,7 @@ impl ContentEncoder {
#[inline]
pub fn is_eof(&self) -> bool {
match *self {
#[cfg(feature="brotli")]
ContentEncoder::Br(ref encoder) => encoder.get_ref().is_eof(),
ContentEncoder::Deflate(ref encoder) => encoder.get_ref().is_eof(),
ContentEncoder::Gzip(ref encoder) => encoder.get_ref().is_eof(),
@@ -552,6 +564,7 @@ impl ContentEncoder {
self, ContentEncoder::Identity(TransferEncoding::eof(SharedBytes::empty())));
match encoder {
#[cfg(feature="brotli")]
ContentEncoder::Br(encoder) => {
match encoder.finish() {
Ok(mut writer) => {
@@ -594,6 +607,7 @@ impl ContentEncoder {
#[inline(always)]
pub fn write(&mut self, data: Binary) -> Result<(), io::Error> {
match *self {
#[cfg(feature="brotli")]
ContentEncoder::Br(ref mut encoder) => {
match encoder.write_all(data.as_ref()) {
Ok(_) => Ok(()),

View File

@@ -32,8 +32,10 @@ const MAX_PIPELINED_MESSAGES: usize = 16;
bitflags! {
struct Flags: u8 {
const STARTED = 0b0000_0001;
const ERROR = 0b0000_0010;
const KEEPALIVE = 0b0000_0100;
const SHUTDOWN = 0b0000_1000;
}
}
@@ -49,7 +51,7 @@ pub(crate) struct Http1<T: IoStream, H: 'static> {
flags: Flags,
settings: Rc<WorkerSettings<H>>,
addr: Option<SocketAddr>,
stream: H1Writer<T>,
stream: H1Writer<T, H>,
reader: Reader,
read_buf: BytesMut,
tasks: VecDeque<Entry>,
@@ -70,7 +72,7 @@ impl<T, H> Http1<T, H>
{
let bytes = settings.get_shared_bytes();
Http1{ flags: Flags::KEEPALIVE,
stream: H1Writer::new(stream, bytes),
stream: H1Writer::new(stream, bytes, Rc::clone(&settings)),
reader: Reader::new(),
tasks: VecDeque::new(),
keepalive_timer: None,
@@ -94,17 +96,32 @@ impl<T, H> Http1<T, H>
match timer.poll() {
Ok(Async::Ready(_)) => {
trace!("Keep-alive timeout, close connection");
return Ok(Async::Ready(()))
self.flags.insert(Flags::SHUTDOWN);
}
Ok(Async::NotReady) => (),
Err(_) => unreachable!(),
}
}
// shutdown
if self.flags.contains(Flags::SHUTDOWN) {
match self.stream.poll_completed(true) {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(_)) => return Ok(Async::Ready(())),
Err(err) => {
debug!("Error sending data: {}", err);
return Err(())
}
}
}
loop {
match self.poll_io()? {
Async::Ready(true) => (),
Async::Ready(false) => return Ok(Async::Ready(())),
Async::Ready(false) => {
self.flags.insert(Flags::SHUTDOWN);
return self.poll()
},
Async::NotReady => return Ok(Async::NotReady),
}
}
@@ -120,6 +137,8 @@ impl<T, H> Http1<T, H>
match self.reader.parse(self.stream.get_mut(),
&mut self.read_buf, &self.settings) {
Ok(Async::Ready(mut req)) => {
self.flags.insert(Flags::STARTED);
// set remote addr
req.set_peer_addr(self.addr);
@@ -260,21 +279,24 @@ impl<T, H> Http1<T, H>
}
// check stream state
match self.stream.poll_completed(self.tasks.is_empty()) {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => {
debug!("Error sending data: {}", err);
return Err(())
if self.flags.contains(Flags::STARTED) {
match self.stream.poll_completed(false) {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => {
debug!("Error sending data: {}", err);
return Err(())
}
_ => (),
}
_ => (),
}
// deal with keep-alive
if self.tasks.is_empty() {
// no keep-alive situations
if self.flags.contains(Flags::ERROR)
if (self.flags.contains(Flags::ERROR)
|| !self.flags.contains(Flags::KEEPALIVE)
|| !self.settings.keep_alive_enabled()
|| !self.settings.keep_alive_enabled()) &&
self.flags.contains(Flags::STARTED)
{
return Ok(Async::Ready(false))
}
@@ -331,7 +353,7 @@ impl Reader {
PayloadStatus::Read
}
}
#[inline]
fn decode(&mut self, buf: &mut BytesMut, payload: &mut PayloadInfo)
-> Result<Decoding, ReaderError>
@@ -468,6 +490,8 @@ impl Reader {
fn parse_message<H>(buf: &mut BytesMut, settings: &WorkerSettings<H>)
-> Poll<(HttpRequest, Option<PayloadInfo>), ParseError> {
// Parse http message
let mut has_te = false;
let mut has_upgrade = false;
let msg = {
let bytes_ptr = buf.as_ref().as_ptr() as usize;
let mut headers: [httparse::Header; MAX_HEADERS] =
@@ -478,13 +502,9 @@ impl Reader {
let mut req = httparse::Request::new(&mut headers);
match req.parse(b)? {
httparse::Status::Complete(len) => {
let method = Method::try_from(req.method.unwrap())
let method = Method::from_bytes(req.method.unwrap().as_bytes())
.map_err(|_| ParseError::Method)?;
let path = req.path.unwrap();
let path_start = path.as_ptr() as usize - bytes_ptr;
let path_end = path_start + path.len();
let path = (path_start, path_end);
let path = Uri::try_from(req.path.unwrap()).unwrap();
let version = if req.version.unwrap() == 1 {
Version::HTTP_11
} else {
@@ -500,28 +520,33 @@ impl Reader {
// convert headers
let msg = settings.get_http_message();
for header in headers[..headers_len].iter() {
if let Ok(name) = HeaderName::try_from(header.name) {
let v_start = header.value.as_ptr() as usize - bytes_ptr;
let v_end = v_start + header.value.len();
let value = unsafe {
HeaderValue::from_shared_unchecked(slice.slice(v_start, v_end)) };
msg.get_mut().headers.append(name, value);
} else {
return Err(ParseError::Header)
{
let msg_mut = msg.get_mut();
for header in headers[..headers_len].iter() {
if let Ok(name) = HeaderName::from_bytes(header.name.as_bytes()) {
has_te = has_te || name == header::TRANSFER_ENCODING;
has_upgrade = has_upgrade || name == header::UPGRADE;
let v_start = header.value.as_ptr() as usize - bytes_ptr;
let v_end = v_start + header.value.len();
let value = unsafe {
HeaderValue::from_shared_unchecked(
slice.slice(v_start, v_end)) };
msg_mut.headers.append(name, value);
} else {
return Err(ParseError::Header)
}
}
msg_mut.uri = path;
msg_mut.method = method;
msg_mut.version = version;
}
let path = slice.slice(path.0, path.1);
let uri = Uri::from_shared(path).map_err(ParseError::Uri)?;
msg.get_mut().uri = uri;
msg.get_mut().method = method;
msg.get_mut().version = version;
msg
};
let decoder = if let Some(len) = msg.get_ref().headers.get(header::CONTENT_LENGTH) {
let decoder = if let Some(len) =
msg.get_ref().headers.get(header::CONTENT_LENGTH)
{
// Content-Length
if let Ok(s) = len.to_str() {
if let Ok(len) = s.parse::<u64>() {
@@ -534,12 +559,10 @@ impl Reader {
debug!("illegal Content-Length: {:?}", len);
return Err(ParseError::Header)
}
} else if chunked(&msg.get_mut().headers)? {
} else if has_te && chunked(&msg.get_mut().headers)? {
// Chunked encoding
Some(Decoder::chunked())
} else if msg.get_ref().headers.contains_key(header::UPGRADE) ||
msg.get_ref().method == Method::CONNECT
{
} else if has_upgrade || msg.get_ref().method == Method::CONNECT {
Some(Decoder::eof())
} else {
None
@@ -548,7 +571,7 @@ impl Reader {
if let Some(decoder) = decoder {
let (psender, payload) = Payload::new(false);
let info = PayloadInfo {
tx: PayloadType::new(&msg.get_mut().headers, psender),
tx: PayloadType::new(&msg.get_ref().headers, psender),
decoder,
};
msg.get_mut().payload = Some(payload);
@@ -1396,6 +1419,7 @@ mod tests {
let mut reader = Reader::new();
let mut req = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings));
let _ = req.payload_mut().set_read_buffer_capacity(0);
assert!(req.chunked().unwrap());
assert!(!req.payload().eof());

View File

@@ -1,11 +1,12 @@
#![cfg_attr(feature = "cargo-clippy", allow(redundant_field_names))]
use std::{io, mem};
use std::rc::Rc;
use bytes::BufMut;
use futures::{Async, Poll};
use tokio_io::AsyncWrite;
use http::{Method, Version};
use http::header::{HeaderValue, CONNECTION, DATE};
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE};
use helpers;
use body::{Body, Binary};
@@ -15,6 +16,7 @@ use httpresponse::HttpResponse;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
use super::shared::SharedBytes;
use super::encoding::ContentEncoder;
use super::settings::WorkerSettings;
const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific
@@ -27,7 +29,7 @@ bitflags! {
}
}
pub(crate) struct H1Writer<T: AsyncWrite> {
pub(crate) struct H1Writer<T: AsyncWrite, H: 'static> {
flags: Flags,
stream: T,
encoder: ContentEncoder,
@@ -35,11 +37,14 @@ pub(crate) struct H1Writer<T: AsyncWrite> {
headers_size: u32,
buffer: SharedBytes,
buffer_capacity: usize,
settings: Rc<WorkerSettings<H>>,
}
impl<T: AsyncWrite> H1Writer<T> {
impl<T: AsyncWrite, H: 'static> H1Writer<T, H> {
pub fn new(stream: T, buf: SharedBytes) -> H1Writer<T> {
pub fn new(stream: T, buf: SharedBytes, settings: Rc<WorkerSettings<H>>)
-> H1Writer<T, H>
{
H1Writer {
flags: Flags::empty(),
encoder: ContentEncoder::empty(buf.clone()),
@@ -48,6 +53,7 @@ impl<T: AsyncWrite> H1Writer<T> {
buffer: buf,
buffer_capacity: 0,
stream,
settings,
}
}
@@ -76,7 +82,9 @@ impl<T: AsyncWrite> H1Writer<T> {
self.disconnected();
return Err(io::Error::new(io::ErrorKind::WriteZero, ""))
},
Ok(n) => written += n,
Ok(n) => {
written += n;
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
return Ok(written)
}
@@ -87,7 +95,7 @@ impl<T: AsyncWrite> H1Writer<T> {
}
}
impl<T: AsyncWrite> Writer for H1Writer<T> {
impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
#[inline]
fn written(&self) -> u64 {
@@ -126,11 +134,14 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
// render message
{
let mut buffer = self.buffer.get_mut();
if let Body::Binary(ref bytes) = body {
buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len());
let mut is_bin = if let Body::Binary(ref bytes) = body {
buffer.reserve(
256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len());
true
} else {
buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE);
}
false
};
// status line
helpers::write_status_line(version, msg.status().as_u16(), &mut buffer);
@@ -139,21 +150,28 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
match body {
Body::Empty =>
if req.method != Method::HEAD {
SharedBytes::extend_from_slice_(buffer, b"\r\ncontent-length: 0\r\n");
SharedBytes::put_slice(
buffer, b"\r\ncontent-length: 0\r\n");
} else {
SharedBytes::extend_from_slice_(buffer, b"\r\n");
SharedBytes::put_slice(buffer, b"\r\n");
},
Body::Binary(ref bytes) =>
helpers::write_content_length(bytes.len(), &mut buffer),
_ =>
SharedBytes::extend_from_slice_(buffer, b"\r\n"),
SharedBytes::put_slice(buffer, b"\r\n"),
}
// write headers
let mut pos = 0;
let mut has_date = false;
let mut remaining = buffer.remaining_mut();
let mut buf: &mut [u8] = unsafe{ mem::transmute(buffer.bytes_mut()) };
for (key, value) in msg.headers() {
if is_bin && key == CONTENT_LENGTH {
is_bin = false;
continue
}
has_date = has_date || key == DATE;
let v = value.as_ref();
let k = key.as_str().as_bytes();
let len = k.len() + v.len() + 4;
@@ -182,9 +200,9 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
}
unsafe{buffer.advance_mut(pos)};
// using helpers::date is quite a lot faster
if !msg.headers().contains_key(DATE) {
helpers::date(&mut buffer);
// optimized date header
if !has_date {
self.settings.set_date(&mut buffer);
} else {
// msg eof
SharedBytes::extend_from_slice_(buffer, b"\r\n");
@@ -211,9 +229,10 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
// shortcut for upgraded connection
if self.flags.contains(Flags::UPGRADE) {
if self.buffer.is_empty() {
let n = self.write_data(payload.as_ref())?;
if payload.len() < n {
self.buffer.extend_from_slice(&payload.as_ref()[n..]);
let pl: &[u8] = payload.as_ref();
let n = self.write_data(pl)?;
if n < pl.len() {
self.buffer.extend_from_slice(&pl[n..]);
return Ok(WriterState::Done);
}
} else {
@@ -224,12 +243,12 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
self.encoder.write(payload)?;
}
} else {
// might be response to EXCEPT
// could be response to EXCEPT header
self.buffer.extend_from_slice(payload.as_ref())
}
}
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
if self.buffer.len() > self.buffer_capacity {
Ok(WriterState::Pause)
} else {
Ok(WriterState::Done)

View File

@@ -43,7 +43,7 @@ struct Http2<T, H>
settings: Rc<WorkerSettings<H>>,
addr: Option<SocketAddr>,
state: State<IoWrapper<T>>,
tasks: VecDeque<Entry>,
tasks: VecDeque<Entry<H>>,
keepalive_timer: Option<Timeout>,
}
@@ -274,20 +274,20 @@ bitflags! {
}
}
struct Entry {
struct Entry<H: 'static> {
task: Box<HttpHandlerTask>,
payload: PayloadType,
recv: RecvStream,
stream: H2Writer,
stream: H2Writer<H>,
flags: EntryFlags,
}
impl Entry {
fn new<H>(parts: Parts,
recv: RecvStream,
resp: SendResponse<Bytes>,
addr: Option<SocketAddr>,
settings: &Rc<WorkerSettings<H>>) -> Entry
impl<H: 'static> Entry<H> {
fn new(parts: Parts,
recv: RecvStream,
resp: SendResponse<Bytes>,
addr: Option<SocketAddr>,
settings: &Rc<WorkerSettings<H>>) -> Entry<H>
where H: HttpHandler + 'static
{
// Payload and Content-Encoding
@@ -320,7 +320,8 @@ impl Entry {
Entry {task: task.unwrap_or_else(|| Pipeline::error(HttpNotFound)),
payload: psender,
stream: H2Writer::new(resp, settings.get_shared_bytes()),
stream: H2Writer::new(
resp, settings.get_shared_bytes(), Rc::clone(settings)),
flags: EntryFlags::empty(),
recv,
}

View File

@@ -1,6 +1,7 @@
#![cfg_attr(feature = "cargo-clippy", allow(redundant_field_names))]
use std::{io, cmp};
use std::rc::Rc;
use bytes::{Bytes, BytesMut};
use futures::{Async, Poll};
use http2::{Reason, SendStream};
@@ -15,6 +16,7 @@ use httprequest::HttpInnerMessage;
use httpresponse::HttpResponse;
use super::encoding::ContentEncoder;
use super::shared::SharedBytes;
use super::settings::WorkerSettings;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
const CHUNK_SIZE: usize = 16_384;
@@ -28,7 +30,7 @@ bitflags! {
}
}
pub(crate) struct H2Writer {
pub(crate) struct H2Writer<H: 'static> {
respond: SendResponse<Bytes>,
stream: Option<SendStream<Bytes>>,
encoder: ContentEncoder,
@@ -36,13 +38,17 @@ pub(crate) struct H2Writer {
written: u64,
buffer: SharedBytes,
buffer_capacity: usize,
settings: Rc<WorkerSettings<H>>,
}
impl H2Writer {
impl<H: 'static> H2Writer<H> {
pub fn new(respond: SendResponse<Bytes>, buf: SharedBytes) -> H2Writer {
pub fn new(respond: SendResponse<Bytes>,
buf: SharedBytes, settings: Rc<WorkerSettings<H>>) -> H2Writer<H>
{
H2Writer {
respond,
settings,
stream: None,
encoder: ContentEncoder::empty(buf.clone()),
flags: Flags::empty(),
@@ -59,7 +65,7 @@ impl H2Writer {
}
}
impl Writer for H2Writer {
impl<H: 'static> Writer for H2Writer<H> {
fn written(&self) -> u64 {
self.written
@@ -84,7 +90,7 @@ impl Writer for H2Writer {
// using helpers::date is quite a lot faster
if !msg.headers().contains_key(DATE) {
let mut bytes = BytesMut::with_capacity(29);
helpers::date_value(&mut bytes);
self.settings.set_date_simple(&mut bytes);
msg.headers_mut().insert(DATE, HeaderValue::try_from(bytes.freeze()).unwrap());
}
@@ -95,7 +101,8 @@ impl Writer for H2Writer {
helpers::convert_usize(bytes.len(), &mut val);
let l = val.len();
msg.headers_mut().insert(
CONTENT_LENGTH, HeaderValue::try_from(val.split_to(l-2).freeze()).unwrap());
CONTENT_LENGTH,
HeaderValue::try_from(val.split_to(l-2).freeze()).unwrap());
}
Body::Empty => {
msg.headers_mut().insert(CONTENT_LENGTH, HeaderValue::from_static("0"));
@@ -197,6 +204,7 @@ impl Writer for H2Writer {
let cap = cmp::min(self.buffer.len(), CHUNK_SIZE);
stream.reserve_capacity(cap);
} else {
self.flags.remove(Flags::RESERVED);
return Ok(Async::NotReady)
}
}
@@ -204,6 +212,6 @@ impl Writer for H2Writer {
}
}
}
return Ok(Async::NotReady)
Ok(Async::NotReady)
}
}

View File

@@ -1,7 +1,10 @@
use std::{fmt, net};
use std::{fmt, mem, net};
use std::fmt::Write;
use std::rc::Rc;
use std::sync::Arc;
use std::cell::{Cell, RefCell, RefMut, UnsafeCell};
use time;
use bytes::BytesMut;
use futures_cpupool::{Builder, CpuPool};
use helpers;
@@ -95,6 +98,8 @@ impl ServerSettings {
}
}
// "Sun, 06 Nov 1994 08:49:37 GMT".len()
const DATE_VALUE_LENGTH: usize = 29;
pub(crate) struct WorkerSettings<H> {
h: RefCell<Vec<H>>,
@@ -104,6 +109,7 @@ pub(crate) struct WorkerSettings<H> {
messages: Rc<helpers::SharedMessagePool>,
channels: Cell<usize>,
node: Box<Node<()>>,
date: UnsafeCell<Date>,
}
impl<H> WorkerSettings<H> {
@@ -121,6 +127,7 @@ impl<H> WorkerSettings<H> {
messages: Rc::new(helpers::SharedMessagePool::new()),
channels: Cell::new(0),
node: Box::new(Node::head()),
date: UnsafeCell::new(Date::new()),
}
}
@@ -164,4 +171,67 @@ impl<H> WorkerSettings<H> {
error!("Number of removed channels is bigger than added channel. Bug in actix-web");
}
}
pub fn update_date(&self) {
unsafe{&mut *self.date.get()}.update();
}
pub fn set_date(&self, dst: &mut BytesMut) {
let mut buf: [u8; 39] = unsafe { mem::uninitialized() };
buf[..6].copy_from_slice(b"date: ");
buf[6..35].copy_from_slice(&(unsafe{&*self.date.get()}.bytes));
buf[35..].copy_from_slice(b"\r\n\r\n");
dst.extend_from_slice(&buf);
}
pub fn set_date_simple(&self, dst: &mut BytesMut) {
dst.extend_from_slice(&(unsafe{&*self.date.get()}.bytes));
}
}
struct Date {
bytes: [u8; DATE_VALUE_LENGTH],
pos: usize,
}
impl Date {
fn new() -> Date {
let mut date = Date{bytes: [0; DATE_VALUE_LENGTH], pos: 0};
date.update();
date
}
fn update(&mut self) {
self.pos = 0;
write!(self, "{}", time::at_utc(time::get_time()).rfc822()).unwrap();
}
}
impl fmt::Write for Date {
fn write_str(&mut self, s: &str) -> fmt::Result {
let len = s.len();
self.bytes[self.pos..self.pos + len].copy_from_slice(s.as_bytes());
self.pos += len;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_date_len() {
assert_eq!(DATE_VALUE_LENGTH, "Sun, 06 Nov 1994 08:49:37 GMT".len());
}
#[test]
fn test_date() {
let settings = WorkerSettings::<()>::new(Vec::new(), KeepAlive::Os);
let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
settings.set_date(&mut buf1);
let mut buf2 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
settings.set_date(&mut buf2);
assert_eq!(buf1, buf2);
}
}

View File

@@ -18,7 +18,6 @@ use native_tls::TlsAcceptor;
#[cfg(feature="alpn")]
use openssl::ssl::{AlpnError, SslAcceptorBuilder};
use helpers;
use super::{IntoHttpHandler, IoStream, KeepAlive};
use super::{PauseServer, ResumeServer, StopServer};
use super::channel::{HttpChannel, WrapperStream};
@@ -41,6 +40,7 @@ pub struct HttpServer<H> where H: IntoHttpHandler + 'static
exit: bool,
shutdown_timeout: u16,
signals: Option<Addr<Syn, signal::ProcessSignals>>,
no_http2: bool,
no_signals: bool,
}
@@ -57,13 +57,8 @@ enum ServerCommand {
WorkerDied(usize, Info),
}
impl<H> Actor for HttpServer<H> where H: IntoHttpHandler
{
impl<H> Actor for HttpServer<H> where H: IntoHttpHandler {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.update_time(ctx);
}
}
impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
@@ -89,15 +84,11 @@ impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
exit: false,
shutdown_timeout: 30,
signals: None,
no_http2: false,
no_signals: false,
}
}
fn update_time(&self, ctx: &mut Context<Self>) {
helpers::update_date();
ctx.run_later(Duration::new(1, 0), |slf, ctx| slf.update_time(ctx));
}
/// Set number of workers to start.
///
/// By default http server uses number of available logical cpu as threads count.
@@ -170,9 +161,15 @@ impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
self
}
/// Disable `HTTP/2` support
pub fn no_http2(mut self) -> Self {
self.no_http2 = true;
self
}
/// Get addresses of bound sockets.
pub fn addrs(&self) -> Vec<net::SocketAddr> {
self.sockets.iter().map(|s| s.0.clone()).collect()
self.sockets.iter().map(|s| s.0).collect()
}
/// Use listener for accepting incoming connection requests
@@ -396,15 +393,17 @@ impl<H: IntoHttpHandler> HttpServer<H>
Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound"))
} else {
// alpn support
builder.set_alpn_protos(b"\x02h2\x08http/1.1")?;
builder.set_alpn_select_callback(|_, protos| {
const H2: &[u8] = b"\x02h2";
if protos.windows(3).any(|window| window == H2) {
Ok(b"h2")
} else {
Err(AlpnError::NOACK)
}
});
if !self.no_http2 {
builder.set_alpn_protos(b"\x02h2\x08http/1.1")?;
builder.set_alpn_select_callback(|_, protos| {
const H2: &[u8] = b"\x02h2";
if protos.windows(3).any(|window| window == H2) {
Ok(b"h2")
} else {
Err(AlpnError::NOACK)
}
});
}
let (tx, rx) = mpsc::unbounded();
let acceptor = builder.build();
@@ -734,6 +733,13 @@ fn start_accept_thread(
workers[next].0, info.clone()));
msg = err.into_inner();
workers.swap_remove(next);
if workers.is_empty() {
error!("No workers");
thread::sleep(sleep);
break
} else if workers.len() <= next {
next = 0;
}
continue
}
}
@@ -741,9 +747,12 @@ fn start_accept_thread(
break
}
},
Err(ref e) if connection_error(e) => continue,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock =>
break,
Err(ref e) if connection_error(e) =>
continue,
Err(e) => {
error!("Error accepting connection: {:?}", e);
error!("Error accepting connection: {}", e);
// sleep after error
thread::sleep(sleep);
break

View File

@@ -22,7 +22,6 @@ use tokio_openssl::SslAcceptorExt;
use actix::*;
use actix::msgs::StopArbiter;
use helpers;
use server::{HttpHandler, KeepAlive};
use server::channel::HttpChannel;
use server::settings::WorkerSettings;
@@ -76,7 +75,7 @@ impl<H: HttpHandler + 'static> Worker<H> {
}
fn update_time(&self, ctx: &mut Context<Self>) {
helpers::update_date();
self.settings.update_date();
ctx.run_later(time::Duration::new(1, 0), |slf, ctx| slf.update_time(ctx));
}
@@ -204,7 +203,8 @@ impl StreamHandlerType {
} else {
false
};
Arbiter::handle().spawn(HttpChannel::new(h, io, peer, http2));
Arbiter::handle().spawn(
HttpChannel::new(h, io, peer, http2));
},
Err(err) =>
trace!("Error during handling tls connection: {}", err),

View File

@@ -5,7 +5,7 @@ use std::rc::Rc;
use std::sync::mpsc;
use std::str::FromStr;
use actix::{Arbiter, Addr, Syn, System, SystemRunner, msgs};
use actix::{Actor, Arbiter, Addr, Syn, System, SystemRunner, Unsync, msgs};
use cookie::Cookie;
use http::{Uri, Method, Version, HeaderMap, HttpTryFrom};
use http::header::HeaderName;
@@ -14,6 +14,9 @@ use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
use net2::TcpBuilder;
#[cfg(feature="alpn")]
use openssl::ssl::SslAcceptor;
use ws;
use body::Binary;
use error::Error;
@@ -27,7 +30,7 @@ use payload::Payload;
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
use server::{HttpServer, IntoHttpHandler, ServerSettings};
use client::{ClientRequest, ClientRequestBuilder};
use client::{ClientRequest, ClientRequestBuilder, ClientConnector};
/// The `TestServer` type.
///
@@ -60,6 +63,8 @@ pub struct TestServer {
thread: Option<thread::JoinHandle<()>>,
system: SystemRunner,
server_sys: Addr<Syn, System>,
ssl: bool,
conn: Addr<Unsync, ClientConnector>,
}
impl TestServer {
@@ -69,9 +74,26 @@ impl TestServer {
/// This method accepts configuration method. You can add
/// middlewares or set handlers for test application.
pub fn new<F>(config: F) -> Self
where F: Sync + Send + 'static + Fn(&mut TestApp<()>),
where F: Sync + Send + 'static + Fn(&mut TestApp<()>)
{
TestServer::with_state(||(), config)
TestServerBuilder::new(||()).start(config)
}
/// Create test server builder
pub fn build() -> TestServerBuilder<()> {
TestServerBuilder::new(||())
}
/// Create test server builder with specific state factory
///
/// This method can be used for constructing application state.
/// Also it can be used for external dependecy initialization,
/// like creating sync actors for diesel integration.
pub fn build_with_state<F, S>(state: F) -> TestServerBuilder<S>
where F: Fn() -> S + Sync + Send + 'static,
S: 'static,
{
TestServerBuilder::new(state)
}
/// Start new test server with application factory
@@ -87,23 +109,31 @@ impl TestServer {
let sys = System::new("actix-test-server");
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = tcp.local_addr().unwrap();
let tcp = TcpListener::from_listener(tcp, &local_addr, Arbiter::handle()).unwrap();
let tcp = TcpListener::from_listener(
tcp, &local_addr, Arbiter::handle()).unwrap();
HttpServer::new(factory).disable_signals().start_incoming(tcp.incoming(), false);
HttpServer::new(factory)
.disable_signals()
.start_incoming(tcp.incoming(), false);
tx.send((Arbiter::system(), local_addr)).unwrap();
let _ = sys.run();
});
let sys = System::new("actix-test");
let (server_sys, addr) = rx.recv().unwrap();
TestServer {
addr,
thread: Some(join),
system: System::new("actix-test"),
server_sys,
ssl: false,
conn: TestServer::get_conn(),
thread: Some(join),
system: sys,
}
}
#[deprecated(since="0.4.10",
note="please use `TestServer::build_with_state()` instead")]
/// Start new test server with custom application state
///
/// This method accepts state factory and configuration method.
@@ -132,12 +162,30 @@ impl TestServer {
let _ = sys.run();
});
let system = System::new("actix-test");
let (server_sys, addr) = rx.recv().unwrap();
TestServer {
addr,
server_sys,
system,
ssl: false,
conn: TestServer::get_conn(),
thread: Some(join),
system: System::new("actix-test"),
}
}
fn get_conn() -> Addr<Unsync, ClientConnector> {
#[cfg(feature="alpn")]
{
use openssl::ssl::{SslMethod, SslConnector, SslVerifyMode};
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
builder.set_verify(SslVerifyMode::NONE);
ClientConnector::with_connector(builder.build()).start()
}
#[cfg(not(feature="alpn"))]
{
ClientConnector::default().start()
}
}
@@ -159,9 +207,9 @@ impl TestServer {
/// Construct test server url
pub fn url(&self, uri: &str) -> String {
if uri.starts_with('/') {
format!("http://{}{}", self.addr, uri)
format!("{}://{}{}", if self.ssl {"https"} else {"http"}, self.addr, uri)
} else {
format!("http://{}/{}", self.addr, uri)
format!("{}://{}/{}", if self.ssl {"https"} else {"http"}, self.addr, uri)
}
}
@@ -183,7 +231,8 @@ impl TestServer {
/// Connect to websocket server
pub fn ws(&mut self) -> Result<(ws::ClientReader, ws::ClientWriter), ws::ClientError> {
let url = self.url("/");
self.system.run_until_complete(ws::Client::new(url).connect())
self.system.run_until_complete(
ws::Client::with_connector(url, self.conn.clone()).connect())
}
/// Create `GET` request
@@ -205,7 +254,9 @@ impl TestServer {
pub fn client(&self, meth: Method, path: &str) -> ClientRequestBuilder {
ClientRequest::build()
.method(meth)
.uri(self.url(path).as_str()).take()
.uri(self.url(path).as_str())
.with_connector(self.conn.clone())
.take()
}
}
@@ -215,6 +266,101 @@ impl Drop for TestServer {
}
}
pub struct TestServerBuilder<S> {
state: Box<Fn() -> S + Sync + Send + 'static>,
#[cfg(feature="alpn")]
ssl: Option<SslAcceptor>,
}
impl<S: 'static> TestServerBuilder<S> {
pub fn new<F>(state: F) -> TestServerBuilder<S>
where F: Fn() -> S + Sync + Send + 'static
{
TestServerBuilder {
state: Box::new(state),
#[cfg(feature="alpn")]
ssl: None,
}
}
#[cfg(feature="alpn")]
/// Create ssl server
pub fn ssl(mut self, ssl: SslAcceptor) -> Self {
self.ssl = Some(ssl);
self
}
#[allow(unused_mut)]
/// Configure test application and run test server
pub fn start<F>(mut self, config: F) -> TestServer
where F: Sync + Send + 'static + Fn(&mut TestApp<S>),
{
let (tx, rx) = mpsc::channel();
#[cfg(feature="alpn")]
let ssl = self.ssl.is_some();
#[cfg(not(feature="alpn"))]
let ssl = false;
// run server in separate thread
let join = thread::spawn(move || {
let sys = System::new("actix-test-server");
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = tcp.local_addr().unwrap();
let tcp = TcpListener::from_listener(
tcp, &local_addr, Arbiter::handle()).unwrap();
let state = self.state;
let srv = HttpServer::new(move || {
let mut app = TestApp::new(state());
config(&mut app);
vec![app]})
.disable_signals();
#[cfg(feature="alpn")]
{
use std::io;
use futures::Stream;
use tokio_openssl::SslAcceptorExt;
let ssl = self.ssl.take();
if let Some(ssl) = ssl {
srv.start_incoming(
tcp.incoming()
.and_then(move |(sock, addr)| {
ssl.accept_async(sock)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
.map(move |s| (s, addr))
}),
false);
} else {
srv.start_incoming(tcp.incoming(), false);
}
}
#[cfg(not(feature="alpn"))]
{
srv.start_incoming(tcp.incoming(), false);
}
tx.send((Arbiter::system(), local_addr)).unwrap();
let _ = sys.run();
});
let system = System::new("actix-test");
let (server_sys, addr) = rx.recv().unwrap();
TestServer {
addr,
server_sys,
ssl,
system,
conn: TestServer::get_conn(),
thread: Some(join),
}
}
}
/// Test application helper for testing request handlers.
pub struct TestApp<S=()> {

View File

@@ -209,6 +209,15 @@ impl Client {
self
}
/// Set websocket handshake timeout
///
/// Handshake timeout is a total time for successful handshake.
/// Default value is 5 seconds.
pub fn timeout(mut self, timeout: Duration) -> Self {
self.request.timeout(timeout);
self
}
/// Connect to websocket server and do ws handshake
pub fn connect(&mut self) -> ClientHandshake {
if let Some(e) = self.err.take() {
@@ -445,16 +454,14 @@ impl Stream for ClientReader {
// read
match Frame::parse(&mut inner.rx, false, max_size) {
Ok(Async::Ready(Some(frame))) => {
let (finished, opcode, payload) = frame.unpack();
// continuation is not supported
if !finished {
inner.closed = true;
return Err(ProtocolError::NoContinuation)
}
let (_finished, opcode, payload) = frame.unpack();
match opcode {
OpCode::Continue => unimplemented!(),
// continuation is not supported
OpCode::Continue => {
inner.closed = true;
Err(ProtocolError::NoContinuation)
},
OpCode::Bad => {
inner.closed = true;
Err(ProtocolError::BadOpCode)

View File

@@ -205,7 +205,7 @@ impl<A, S> ActorHttpContext for WebsocketContext<A, S> where A: Actor<Context=Se
};
if self.inner.alive() && self.inner.poll(ctx).is_err() {
return Err(ErrorInternalServerError("error").into())
return Err(ErrorInternalServerError("error"))
}
// frames

31
tests/cert.pem Normal file
View File

@@ -0,0 +1,31 @@
-----BEGIN CERTIFICATE-----
MIIFPjCCAyYCCQDvLYiYD+jqeTANBgkqhkiG9w0BAQsFADBhMQswCQYDVQQGEwJV
UzELMAkGA1UECAwCQ0ExCzAJBgNVBAcMAlNGMRAwDgYDVQQKDAdDb21wYW55MQww
CgYDVQQLDANPcmcxGDAWBgNVBAMMD3d3dy5leGFtcGxlLmNvbTAeFw0xODAxMjUx
NzQ2MDFaFw0xOTAxMjUxNzQ2MDFaMGExCzAJBgNVBAYTAlVTMQswCQYDVQQIDAJD
QTELMAkGA1UEBwwCU0YxEDAOBgNVBAoMB0NvbXBhbnkxDDAKBgNVBAsMA09yZzEY
MBYGA1UEAwwPd3d3LmV4YW1wbGUuY29tMIICIjANBgkqhkiG9w0BAQEFAAOCAg8A
MIICCgKCAgEA2WzIA2IpVR9Tb9EFhITlxuhE5rY2a3S6qzYNzQVgSFggxXEPn8k1
sQEcer5BfAP986Sck3H0FvB4Bt/I8PwOtUCmhwcc8KtB5TcGPR4fjXnrpC+MIK5U
NLkwuyBDKziYzTdBj8kUFX1WxmvEHEgqToPOZfBgsS71cJAR/zOWraDLSRM54jXy
voLZN4Ti9rQagQrvTQ44Vz5ycDQy7UxtbUGh1CVv69vNVr7/SOOh/Nw5FNOZWLWr
odGyoec5wh9iqRZgRqiTUc6Lt7V2RWc2X2gjwST2UfI+U46Ip3oaQ7ZD4eAkoqND
xdniBZAykVG3c/99ux4BAESTF8fsNch6UticBxYMuTu+ouvP0psfI9wwwNliJDmA
CRUTB9AgRynbL1AzhqQoDfsb98IZfjfNOpwnwuLwpMAPhbgd5KNdZaIJ4Hb6/stI
yFElOExxd3TAxF2Gshd/lq1JcNHAZ1DSXV5MvOWT/NWgXwbIzUgQ8eIi+HuDYX2U
UuaB6R8tbd52H7rbUv6HrfinuSlKWqjSYLkiKHkwUpoMw8y9UycRSzs1E9nPwPTO
vRXb0mNCQeBCV9FvStNVXdCUTT8LGPv87xSD2pmt7LijlE6mHLG8McfcWkzA69un
CEHIFAFDimTuN7EBljc119xWFTcHMyoZAfFF+oTqwSbBGImruCxnaJECAwEAATAN
BgkqhkiG9w0BAQsFAAOCAgEApavsgsn7SpPHfhDSN5iZs1ILZQRewJg0Bty0xPfk
3tynSW6bNH3nSaKbpsdmxxomthNSQgD2heOq1By9YzeOoNR+7Pk3s4FkASnf3ToI
JNTUasBFFfaCG96s4Yvs8KiWS/k84yaWuU8c3Wb1jXs5Rv1qE1Uvuwat1DSGXSoD
JNluuIkCsC4kWkyq5pWCGQrabWPRTWsHwC3PTcwSRBaFgYLJaR72SloHB1ot02zL
d2age9dmFRFLLCBzP+D7RojBvL37qS/HR+rQ4SoQwiVc/JzaeqSe7ZbvEH9sZYEu
ALowJzgbwro7oZflwTWunSeSGDSltkqKjvWvZI61pwfHKDahUTmZ5h2y67FuGEaC
CIOUI8dSVSPKITxaq3JL4ze2e9/0Lt7hj19YK2uUmtMAW5Tirz4Yx5lyGH9U8Wur
y/X8VPxTc4A9TMlJgkyz0hqvhbPOT/zSWB10zXh0glKAsSBryAOEDxV1UygmSir7
YV8Qaq+oyKUTMc1MFq5vZ07M51EPaietn85t8V2Y+k/8XYltRp32NxsypxAJuyxh
g/ko6RVTrWa1sMvz/F9LFqAdKiK5eM96lh9IU4xiLg4ob8aS/GRAA8oIFkZFhLrt
tOwjIUPmEPyHWFi8dLpNuQKYalLYhuwZftG/9xV+wqhKGZO9iPrpHSYBRTap8w2y
1QU=
-----END CERTIFICATE-----

51
tests/key.pem Normal file
View File

@@ -0,0 +1,51 @@
-----BEGIN RSA PRIVATE KEY-----
MIIJKAIBAAKCAgEA2WzIA2IpVR9Tb9EFhITlxuhE5rY2a3S6qzYNzQVgSFggxXEP
n8k1sQEcer5BfAP986Sck3H0FvB4Bt/I8PwOtUCmhwcc8KtB5TcGPR4fjXnrpC+M
IK5UNLkwuyBDKziYzTdBj8kUFX1WxmvEHEgqToPOZfBgsS71cJAR/zOWraDLSRM5
4jXyvoLZN4Ti9rQagQrvTQ44Vz5ycDQy7UxtbUGh1CVv69vNVr7/SOOh/Nw5FNOZ
WLWrodGyoec5wh9iqRZgRqiTUc6Lt7V2RWc2X2gjwST2UfI+U46Ip3oaQ7ZD4eAk
oqNDxdniBZAykVG3c/99ux4BAESTF8fsNch6UticBxYMuTu+ouvP0psfI9wwwNli
JDmACRUTB9AgRynbL1AzhqQoDfsb98IZfjfNOpwnwuLwpMAPhbgd5KNdZaIJ4Hb6
/stIyFElOExxd3TAxF2Gshd/lq1JcNHAZ1DSXV5MvOWT/NWgXwbIzUgQ8eIi+HuD
YX2UUuaB6R8tbd52H7rbUv6HrfinuSlKWqjSYLkiKHkwUpoMw8y9UycRSzs1E9nP
wPTOvRXb0mNCQeBCV9FvStNVXdCUTT8LGPv87xSD2pmt7LijlE6mHLG8McfcWkzA
69unCEHIFAFDimTuN7EBljc119xWFTcHMyoZAfFF+oTqwSbBGImruCxnaJECAwEA
AQKCAgAME3aoeXNCPxMrSri7u4Xnnk71YXl0Tm9vwvjRQlMusXZggP8VKN/KjP0/
9AE/GhmoxqPLrLCZ9ZE1EIjgmZ9Xgde9+C8rTtfCG2RFUL7/5J2p6NonlocmxoJm
YkxYwjP6ce86RTjQWL3RF3s09u0inz9/efJk5O7M6bOWMQ9VZXDlBiRY5BYvbqUR
6FeSzD4MnMbdyMRoVBeXE88gTvZk8xhB6DJnLzYgc0tKiRoeKT0iYv5JZw25VyRM
ycLzfTrFmXCPfB1ylb483d9Ly4fBlM8nkx37PzEnAuukIawDxsPOb9yZC+hfvNJI
7NFiMN+3maEqG2iC00w4Lep4skHY7eHUEUMl+Wjr+koAy2YGLWAwHZQTm7iXn9Ab
L6adL53zyCKelRuEQOzbeosJAqS+5fpMK0ekXyoFIuskj7bWuIoCX7K/kg6q5IW+
vC2FrlsrbQ79GztWLVmHFO1I4J9M5r666YS0qdh8c+2yyRl4FmSiHfGxb3eOKpxQ
b6uI97iZlkxPF9LYUCSc7wq0V2gGz+6LnGvTHlHrOfVXqw/5pLAKhXqxvnroDTwz
0Ay/xFF6ei/NSxBY5t8ztGCBm45wCU3l8pW0X6dXqwUipw5b4MRy1VFRu6rqlmbL
OPSCuLxqyqsigiEYsBgS/icvXz9DWmCQMPd2XM9YhsHvUq+R4QKCAQEA98EuMMXI
6UKIt1kK2t/3OeJRyDd4iv/fCMUAnuPjLBvFE4cXD/SbqCxcQYqb+pue3PYkiTIC
71rN8OQAc5yKhzmmnCE5N26br/0pG4pwEjIr6mt8kZHmemOCNEzvhhT83nfKmV0g
9lNtuGEQMiwmZrpUOF51JOMC39bzcVjYX2Cmvb7cFbIq3lR0zwM+aZpQ4P8LHCIu
bgHmwbdlkLyIULJcQmHIbo6nPFB3ZZE4mqmjwY+rA6Fh9rgBa8OFCfTtrgeYXrNb
IgZQ5U8GoYRPNC2ot0vpTinraboa/cgm6oG4M7FW1POCJTl+/ktHEnKuO5oroSga
/BSg7hCNFVaOhwKCAQEA4Kkys0HtwEbV5mY/NnvUD5KwfXX7BxoXc9lZ6seVoLEc
KjgPYxqYRVrC7dB2YDwwp3qcRTi/uBAgFNm3iYlDzI4xS5SeaudUWjglj7BSgXE2
iOEa7EwcvVPluLaTgiWjlzUKeUCNNHWSeQOt+paBOT+IgwRVemGVpAgkqQzNh/nP
tl3p9aNtgzEm1qVlPclY/XUCtf3bcOR+z1f1b4jBdn0leu5OhnxkC+Htik+2fTXD
jt6JGrMkanN25YzsjnD3Sn+v6SO26H99wnYx5oMSdmb8SlWRrKtfJHnihphjG/YY
l1cyorV6M/asSgXNQfGJm4OuJi0I4/FL2wLUHnU+JwKCAQEAzh4WipcRthYXXcoj
gMKRkMOb3GFh1OpYqJgVExtudNTJmZxq8GhFU51MR27Eo7LycMwKy2UjEfTOnplh
Us2qZiPtW7k8O8S2m6yXlYUQBeNdq9IuuYDTaYD94vsazscJNSAeGodjE+uGvb1q
1wLqE87yoE7dUInYa1cOA3+xy2/CaNuviBFJHtzOrSb6tqqenQEyQf6h9/12+DTW
t5pSIiixHrzxHiFqOoCLRKGToQB+71rSINwTf0nITNpGBWmSj5VcC3VV3TG5/XxI
fPlxV2yhD5WFDPVNGBGvwPDSh4jSMZdZMSNBZCy4XWFNSKjGEWoK4DFYed3DoSt9
5IG1YwKCAQA63ntHl64KJUWlkwNbboU583FF3uWBjee5VqoGKHhf3CkKMxhtGqnt
+oN7t5VdUEhbinhqdx1dyPPvIsHCS3K1pkjqii4cyzNCVNYa2dQ00Qq+QWZBpwwc
3GAkz8rFXsGIPMDa1vxpU6mnBjzPniKMcsZ9tmQDppCEpBGfLpio2eAA5IkK8eEf
cIDB3CM0Vo94EvI76CJZabaE9IJ+0HIJb2+jz9BJ00yQBIqvJIYoNy9gP5Xjpi+T
qV/tdMkD5jwWjHD3AYHLWKUGkNwwkAYFeqT/gX6jpWBP+ZRPOp011X3KInJFSpKU
DT5GQ1Dux7EMTCwVGtXqjO8Ym5wjwwsfAoIBAEcxlhIW1G6BiNfnWbNPWBdh3v/K
5Ln98Rcrz8UIbWyl7qNPjYb13C1KmifVG1Rym9vWMO3KuG5atK3Mz2yLVRtmWAVc
fxzR57zz9MZFDun66xo+Z1wN3fVxQB4CYpOEI4Lb9ioX4v85hm3D6RpFukNtRQEc
Gfr4scTjJX4jFWDp0h6ffMb8mY+quvZoJ0TJqV9L9Yj6Ksdvqez/bdSraev97bHQ
4gbQxaTZ6WjaD4HjpPQefMdWp97Metg0ZQSS8b8EzmNFgyJ3XcjirzwliKTAQtn6
I2sd0NCIooelrKRD8EJoDUwxoOctY7R97wpZ7/wEHU45cBCbRV3H4JILS5c=
-----END RSA PRIVATE KEY-----

View File

@@ -66,6 +66,21 @@ fn test_simple() {
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
}
#[test]
fn test_with_query_parameter() {
let mut srv = test::TestServer::new(
|app| app.handler(|req: HttpRequest| match req.query().get("qp") {
Some(_) => httpcodes::HTTPOk.build().finish(),
None => httpcodes::HTTPBadRequest.build().finish(),
}));
let request = srv.get().uri(srv.url("/?qp=5").as_str()).finish().unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
}
#[test]
fn test_no_decompress() {
let mut srv = test::TestServer::new(

View File

@@ -743,7 +743,7 @@ fn test_h2() {
})
});
let _res = core.run(tcp);
// assert_eq!(_res.unwrap(), Bytes::from_static(STR.as_ref()));
// assert_eq!(res.unwrap(), Bytes::from_static(STR.as_ref()));
}
#[test]

View File

@@ -3,9 +3,14 @@ extern crate actix_web;
extern crate futures;
extern crate http;
extern crate bytes;
extern crate rand;
use bytes::Bytes;
use futures::Stream;
use rand::Rng;
#[cfg(feature="alpn")]
extern crate openssl;
use actix_web::*;
use actix::prelude::*;
@@ -51,3 +56,138 @@ fn test_simple() {
let (item, _) = srv.execute(reader.into_future()).unwrap();
assert_eq!(item, Some(ws::Message::Close(ws::CloseCode::Normal)));
}
#[test]
fn test_large_text() {
let data = rand::thread_rng()
.gen_ascii_chars()
.take(65_536)
.collect::<String>();
let mut srv = test::TestServer::new(
|app| app.handler(|req| ws::start(req, Ws)));
let (mut reader, mut writer) = srv.ws().unwrap();
for _ in 0..100 {
writer.text(data.clone());
let (item, r) = srv.execute(reader.into_future()).unwrap();
reader = r;
assert_eq!(item, Some(ws::Message::Text(data.clone())));
}
}
#[test]
fn test_large_bin() {
let data = rand::thread_rng()
.gen_ascii_chars()
.take(65_536)
.collect::<String>();
let mut srv = test::TestServer::new(
|app| app.handler(|req| ws::start(req, Ws)));
let (mut reader, mut writer) = srv.ws().unwrap();
for _ in 0..100 {
writer.binary(data.clone());
let (item, r) = srv.execute(reader.into_future()).unwrap();
reader = r;
assert_eq!(item, Some(ws::Message::Binary(Binary::from(data.clone()))));
}
}
struct Ws2 {
count: usize,
bin: bool,
}
impl Actor for Ws2 {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.send(ctx);
}
}
impl Ws2 {
fn send(&mut self, ctx: &mut ws::WebsocketContext<Self>) {
if self.bin {
ctx.binary(Vec::from("0".repeat(65_536)));
} else {
ctx.text("0".repeat(65_536));
}
ctx.drain().and_then(|_, act, ctx| {
act.count += 1;
if act.count != 10_000 {
act.send(ctx);
}
actix::fut::ok(())
}).wait(ctx);
}
}
impl StreamHandler<ws::Message, ws::ProtocolError> for Ws2 {
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
match msg {
ws::Message::Ping(msg) => ctx.pong(&msg),
ws::Message::Text(text) => ctx.text(text),
ws::Message::Binary(bin) => ctx.binary(bin),
ws::Message::Close(reason) => ctx.close(reason, ""),
_ => (),
}
}
}
#[test]
fn test_server_send_text() {
let data = Some(ws::Message::Text("0".repeat(65_536)));
let mut srv = test::TestServer::new(
|app| app.handler(|req| ws::start(req, Ws2{count:0, bin: false})));
let (mut reader, _writer) = srv.ws().unwrap();
for _ in 0..10_000 {
let (item, r) = srv.execute(reader.into_future()).unwrap();
reader = r;
assert_eq!(item, data);
}
}
#[test]
fn test_server_send_bin() {
let data = Some(ws::Message::Binary(Binary::from("0".repeat(65_536))));
let mut srv = test::TestServer::new(
|app| app.handler(|req| ws::start(req, Ws2{count:0, bin: true})));
let (mut reader, _writer) = srv.ws().unwrap();
for _ in 0..10_000 {
let (item, r) = srv.execute(reader.into_future()).unwrap();
reader = r;
assert_eq!(item, data);
}
}
#[test]
#[cfg(feature="alpn")]
fn test_ws_server_ssl() {
extern crate openssl;
use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype};
// load ssl keys
let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
builder.set_private_key_file("tests/key.pem", SslFiletype::PEM).unwrap();
builder.set_certificate_chain_file("tests/cert.pem").unwrap();
let mut srv = test::TestServer::build()
.ssl(builder.build())
.start(|app| app.handler(|req| ws::start(req, Ws2{count:0, bin: false})));
let (mut reader, _writer) = srv.ws().unwrap();
let data = Some(ws::Message::Text("0".repeat(65_536)));
for _ in 0..10_000 {
let (item, r) = srv.execute(reader.into_future()).unwrap();
reader = r;
assert_eq!(item, data);
}
}