mirror of
https://github.com/fafhrd91/actix-web
synced 2025-07-24 09:47:49 +02:00
Compare commits
16 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
ce6d237cc1 | ||
|
70caa2552b | ||
|
ee7d58dd7f | ||
|
c4f4cadb43 | ||
|
978091cedb | ||
|
8198f5e10a | ||
|
6cd40df387 | ||
|
35ee5d36d8 | ||
|
e7ec0f9fd7 | ||
|
f4a47ef71e | ||
|
6b1a79fab8 | ||
|
ab73da4a1a | ||
|
e0c8da567c | ||
|
c10dedf7e4 | ||
|
ec192e0ab1 | ||
|
6d792d9948 |
@@ -12,9 +12,6 @@ matrix:
|
||||
- rust: stable
|
||||
- rust: beta
|
||||
- rust: nightly
|
||||
allow_failures:
|
||||
- rust: nightly
|
||||
- rust: beta
|
||||
|
||||
#rust:
|
||||
# - 1.21.0
|
||||
|
15
CHANGES.md
15
CHANGES.md
@@ -1,5 +1,20 @@
|
||||
# Changes
|
||||
|
||||
## 0.4.10 (2018-03-20)
|
||||
|
||||
* Use `Error` instead of `InternalError` for `error::ErrorXXXX` methods
|
||||
|
||||
* Allow to set client request timeout
|
||||
|
||||
* Allow to set client websocket handshake timeout
|
||||
|
||||
* Refactor `TestServer` configuration
|
||||
|
||||
* Fix server websockets big payloads support
|
||||
|
||||
* Fix http/2 date header generation
|
||||
|
||||
|
||||
## 0.4.9 (2018-03-16)
|
||||
|
||||
* Allow to disable http/2 support
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-web"
|
||||
version = "0.4.9"
|
||||
version = "0.4.10"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix web is a simple, pragmatic, extremely fast, web framework for Rust."
|
||||
readme = "README.md"
|
||||
@@ -42,7 +42,7 @@ session = ["cookie/secure"]
|
||||
brotli = ["brotli2"]
|
||||
|
||||
[dependencies]
|
||||
actix = "^0.5.4"
|
||||
actix = "^0.5.5"
|
||||
|
||||
base64 = "0.9"
|
||||
bitflags = "1.0"
|
||||
|
@@ -124,7 +124,8 @@ fn main() {
|
||||
}
|
||||
}))
|
||||
.resource("/error.html", |r| r.f(|req| {
|
||||
error::ErrorBadRequest(io::Error::new(io::ErrorKind::Other, "test"))
|
||||
error::InternalError::new(
|
||||
io::Error::new(io::ErrorKind::Other, "test"), StatusCode::OK)
|
||||
}))
|
||||
// static files
|
||||
.handler("/static/", fs::StaticFiles::new("../static/", true))
|
||||
|
@@ -235,6 +235,7 @@ impl<'a> From<&'a Arc<Vec<u8>>> for Binary {
|
||||
}
|
||||
|
||||
impl AsRef<[u8]> for Binary {
|
||||
#[inline]
|
||||
fn as_ref(&self) -> &[u8] {
|
||||
match *self {
|
||||
Binary::Bytes(ref bytes) => bytes.as_ref(),
|
||||
|
@@ -1,6 +1,7 @@
|
||||
use std::{fmt, mem};
|
||||
use std::fmt::Write as FmtWrite;
|
||||
use std::io::Write;
|
||||
use std::time::Duration;
|
||||
|
||||
use actix::{Addr, Unsync};
|
||||
use cookie::{Cookie, CookieJar};
|
||||
@@ -26,6 +27,7 @@ pub struct ClientRequest {
|
||||
body: Body,
|
||||
chunked: bool,
|
||||
upgrade: bool,
|
||||
timeout: Option<Duration>,
|
||||
encoding: ContentEncoding,
|
||||
response_decompress: bool,
|
||||
buffer_capacity: usize,
|
||||
@@ -49,6 +51,7 @@ impl Default for ClientRequest {
|
||||
body: Body::Empty,
|
||||
chunked: false,
|
||||
upgrade: false,
|
||||
timeout: None,
|
||||
encoding: ContentEncoding::Auto,
|
||||
response_decompress: true,
|
||||
buffer_capacity: 32_768,
|
||||
@@ -204,10 +207,16 @@ impl ClientRequest {
|
||||
///
|
||||
/// This method returns future that resolves to a ClientResponse
|
||||
pub fn send(mut self) -> SendRequest {
|
||||
match mem::replace(&mut self.conn, ConnectionType::Default) {
|
||||
let timeout = self.timeout.take();
|
||||
let send = match mem::replace(&mut self.conn, ConnectionType::Default) {
|
||||
ConnectionType::Default => SendRequest::new(self),
|
||||
ConnectionType::Connector(conn) => SendRequest::with_connector(self, conn),
|
||||
ConnectionType::Connection(conn) => SendRequest::with_connection(self, conn),
|
||||
};
|
||||
if let Some(timeout) = timeout {
|
||||
send.timeout(timeout)
|
||||
} else {
|
||||
send
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -224,7 +233,6 @@ impl fmt::Debug for ClientRequest {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// An HTTP Client request builder
|
||||
///
|
||||
/// This type can be used to construct an instance of `ClientRequest` through a
|
||||
@@ -476,6 +484,17 @@ impl ClientRequestBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Set request timeout
|
||||
///
|
||||
/// Request timeout is a total time before response should be received.
|
||||
/// Default value is 5 seconds.
|
||||
pub fn timeout(&mut self, timeout: Duration) -> &mut Self {
|
||||
if let Some(parts) = parts(&mut self.request, &self.err) {
|
||||
parts.timeout = Some(timeout);
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Send request using custom connector
|
||||
pub fn with_connector(&mut self, conn: Addr<Unsync, ClientConnector>) -> &mut Self {
|
||||
if let Some(parts) = parts(&mut self.request, &self.err) {
|
||||
|
@@ -191,7 +191,7 @@ impl<A, S> ActorHttpContext for HttpContext<A, S> where A: Actor<Context=Self>,
|
||||
if self.inner.alive() {
|
||||
match self.inner.poll(ctx) {
|
||||
Ok(Async::NotReady) | Ok(Async::Ready(())) => (),
|
||||
Err(_) => return Err(ErrorInternalServerError("error").into()),
|
||||
Err(_) => return Err(ErrorInternalServerError("error")),
|
||||
}
|
||||
}
|
||||
|
||||
|
69
src/error.rs
69
src/error.rs
@@ -575,68 +575,91 @@ impl<T> Responder for InternalError<T>
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *BAD REQUEST* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorBadRequest<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::BAD_REQUEST)
|
||||
pub fn ErrorBadRequest<T>(err: T) -> Error
|
||||
where T: Send + Sync + fmt::Debug + 'static
|
||||
{
|
||||
InternalError::new(err, StatusCode::BAD_REQUEST).into()
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *UNAUTHORIZED* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorUnauthorized<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::UNAUTHORIZED)
|
||||
pub fn ErrorUnauthorized<T>(err: T) -> Error
|
||||
where T: Send + Sync + fmt::Debug + 'static
|
||||
{
|
||||
InternalError::new(err, StatusCode::UNAUTHORIZED).into()
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *FORBIDDEN* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorForbidden<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::FORBIDDEN)
|
||||
pub fn ErrorForbidden<T>(err: T) -> Error
|
||||
where T: Send + Sync + fmt::Debug + 'static
|
||||
{
|
||||
InternalError::new(err, StatusCode::FORBIDDEN).into()
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *NOT FOUND* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorNotFound<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::NOT_FOUND)
|
||||
pub fn ErrorNotFound<T>(err: T) -> Error
|
||||
where T: Send + Sync + fmt::Debug + 'static
|
||||
{
|
||||
InternalError::new(err, StatusCode::NOT_FOUND).into()
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *METHOD NOT ALLOWED* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorMethodNotAllowed<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::METHOD_NOT_ALLOWED)
|
||||
pub fn ErrorMethodNotAllowed<T>(err: T) -> Error
|
||||
where T: Send + Sync + fmt::Debug + 'static
|
||||
{
|
||||
InternalError::new(err, StatusCode::METHOD_NOT_ALLOWED).into()
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *REQUEST TIMEOUT* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorRequestTimeout<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::REQUEST_TIMEOUT)
|
||||
pub fn ErrorRequestTimeout<T>(err: T) -> Error
|
||||
where T: Send + Sync + fmt::Debug + 'static
|
||||
{
|
||||
InternalError::new(err, StatusCode::REQUEST_TIMEOUT).into()
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *CONFLICT* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorConflict<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::CONFLICT)
|
||||
pub fn ErrorConflict<T>(err: T) -> Error
|
||||
where T: Send + Sync + fmt::Debug + 'static
|
||||
{
|
||||
InternalError::new(err, StatusCode::CONFLICT).into()
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *GONE* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorGone<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::GONE)
|
||||
pub fn ErrorGone<T>(err: T) -> Error
|
||||
where T: Send + Sync + fmt::Debug + 'static
|
||||
{
|
||||
InternalError::new(err, StatusCode::GONE).into()
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *PRECONDITION FAILED* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorPreconditionFailed<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::PRECONDITION_FAILED)
|
||||
pub fn ErrorPreconditionFailed<T>(err: T) -> Error
|
||||
where T: Send + Sync + fmt::Debug + 'static
|
||||
{
|
||||
InternalError::new(err, StatusCode::PRECONDITION_FAILED).into()
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *EXPECTATION FAILED* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorExpectationFailed<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::EXPECTATION_FAILED)
|
||||
pub fn ErrorExpectationFailed<T>(err: T) -> Error
|
||||
where T: Send + Sync + fmt::Debug + 'static
|
||||
{
|
||||
InternalError::new(err, StatusCode::EXPECTATION_FAILED).into()
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *INTERNAL SERVER ERROR* response.
|
||||
/// Helper function that creates wrapper of any error and
|
||||
/// generate *INTERNAL SERVER ERROR* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorInternalServerError<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::INTERNAL_SERVER_ERROR)
|
||||
pub fn ErrorInternalServerError<T>(err: T) -> Error
|
||||
where T: Send + Sync + fmt::Debug + 'static
|
||||
{
|
||||
InternalError::new(err, StatusCode::INTERNAL_SERVER_ERROR).into()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@@ -149,6 +149,8 @@ impl ContentEncoding {
|
||||
ContentEncoding::Identity | ContentEncoding::Auto => "identity",
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// default quality value
|
||||
pub fn quality(&self) -> f64 {
|
||||
match *self {
|
||||
|
@@ -1,71 +1,13 @@
|
||||
use std::{str, mem, ptr, slice};
|
||||
use std::{mem, ptr, slice};
|
||||
use std::cell::RefCell;
|
||||
use std::fmt::{self, Write};
|
||||
use std::rc::Rc;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::collections::VecDeque;
|
||||
use time;
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use http::Version;
|
||||
|
||||
use httprequest::HttpInnerMessage;
|
||||
|
||||
// "Sun, 06 Nov 1994 08:49:37 GMT".len()
|
||||
pub(crate) const DATE_VALUE_LENGTH: usize = 29;
|
||||
|
||||
pub(crate) fn date(dst: &mut BytesMut) {
|
||||
CACHED.with(|cache| {
|
||||
let mut buf: [u8; 39] = unsafe { mem::uninitialized() };
|
||||
buf[..6].copy_from_slice(b"date: ");
|
||||
buf[6..35].copy_from_slice(cache.borrow().buffer());
|
||||
buf[35..].copy_from_slice(b"\r\n\r\n");
|
||||
dst.extend_from_slice(&buf);
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn date_value(dst: &mut BytesMut) {
|
||||
CACHED.with(|cache| {
|
||||
dst.extend_from_slice(cache.borrow().buffer());
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn update_date() {
|
||||
CACHED.with(|cache| {
|
||||
cache.borrow_mut().update();
|
||||
});
|
||||
}
|
||||
|
||||
struct CachedDate {
|
||||
bytes: [u8; DATE_VALUE_LENGTH],
|
||||
pos: usize,
|
||||
}
|
||||
|
||||
thread_local!(static CACHED: RefCell<CachedDate> = RefCell::new(CachedDate {
|
||||
bytes: [0; DATE_VALUE_LENGTH],
|
||||
pos: 0,
|
||||
}));
|
||||
|
||||
impl CachedDate {
|
||||
fn buffer(&self) -> &[u8] {
|
||||
&self.bytes[..]
|
||||
}
|
||||
|
||||
fn update(&mut self) {
|
||||
self.pos = 0;
|
||||
write!(self, "{}", time::at_utc(time::get_time()).rfc822()).unwrap();
|
||||
assert_eq!(self.pos, DATE_VALUE_LENGTH);
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Write for CachedDate {
|
||||
fn write_str(&mut self, s: &str) -> fmt::Result {
|
||||
let len = s.len();
|
||||
self.bytes[self.pos..self.pos + len].copy_from_slice(s.as_bytes());
|
||||
self.pos += len;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Internal use only! unsafe
|
||||
pub(crate) struct SharedMessagePool(RefCell<VecDeque<Rc<HttpInnerMessage>>>);
|
||||
|
||||
@@ -202,7 +144,7 @@ pub(crate) fn write_status_line(version: Version, mut n: u16, bytes: &mut BytesM
|
||||
}
|
||||
}
|
||||
|
||||
bytes.extend_from_slice(&buf);
|
||||
bytes.put_slice(&buf);
|
||||
if four {
|
||||
bytes.put(b' ');
|
||||
}
|
||||
@@ -214,7 +156,7 @@ pub(crate) fn write_content_length(mut n: usize, bytes: &mut BytesMut) {
|
||||
b'n',b't',b'-',b'l',b'e',b'n',b'g',
|
||||
b't',b'h',b':',b' ',b'0',b'\r',b'\n'];
|
||||
buf[18] = (n as u8) + b'0';
|
||||
bytes.extend_from_slice(&buf);
|
||||
bytes.put_slice(&buf);
|
||||
} else if n < 100 {
|
||||
let mut buf: [u8; 22] = [b'\r',b'\n',b'c',b'o',b'n',b't',b'e',
|
||||
b'n',b't',b'-',b'l',b'e',b'n',b'g',
|
||||
@@ -224,7 +166,7 @@ pub(crate) fn write_content_length(mut n: usize, bytes: &mut BytesMut) {
|
||||
ptr::copy_nonoverlapping(
|
||||
DEC_DIGITS_LUT.as_ptr().offset(d1 as isize), buf.as_mut_ptr().offset(18), 2);
|
||||
}
|
||||
bytes.extend_from_slice(&buf);
|
||||
bytes.put_slice(&buf);
|
||||
} else if n < 1000 {
|
||||
let mut buf: [u8; 23] = [b'\r',b'\n',b'c',b'o',b'n',b't',b'e',
|
||||
b'n',b't',b'-',b'l',b'e',b'n',b'g',
|
||||
@@ -238,9 +180,9 @@ pub(crate) fn write_content_length(mut n: usize, bytes: &mut BytesMut) {
|
||||
// decode last 1
|
||||
buf[18] = (n as u8) + b'0';
|
||||
|
||||
bytes.extend_from_slice(&buf);
|
||||
bytes.put_slice(&buf);
|
||||
} else {
|
||||
bytes.extend_from_slice(b"\r\ncontent-length: ");
|
||||
bytes.put_slice(b"\r\ncontent-length: ");
|
||||
convert_usize(n, bytes);
|
||||
}
|
||||
}
|
||||
@@ -299,20 +241,6 @@ pub(crate) fn convert_usize(mut n: usize, bytes: &mut BytesMut) {
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_date_len() {
|
||||
assert_eq!(DATE_VALUE_LENGTH, "Sun, 06 Nov 1994 08:49:37 GMT".len());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_date() {
|
||||
let mut buf1 = BytesMut::new();
|
||||
date(&mut buf1);
|
||||
let mut buf2 = BytesMut::new();
|
||||
date(&mut buf2);
|
||||
assert_eq!(buf1, buf2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_write_content_length() {
|
||||
let mut bytes = BytesMut::new();
|
||||
|
@@ -203,11 +203,10 @@ impl<S> HttpRequest<S> {
|
||||
#[inline]
|
||||
pub fn uri(&self) -> &Uri { &self.as_ref().uri }
|
||||
|
||||
#[doc(hidden)]
|
||||
#[inline]
|
||||
/// Modify the Request Uri.
|
||||
/// Returns mutable the Request Uri.
|
||||
///
|
||||
/// This might be useful for middlewares, i.e. path normalization
|
||||
/// This might be useful for middlewares, e.g. path normalization.
|
||||
#[inline]
|
||||
pub fn uri_mut(&mut self) -> &mut Uri {
|
||||
&mut self.as_mut().uri
|
||||
}
|
||||
@@ -222,7 +221,9 @@ impl<S> HttpRequest<S> {
|
||||
self.as_ref().version
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
///Returns mutable Request's headers.
|
||||
///
|
||||
///This is intended to be used by middleware.
|
||||
#[inline]
|
||||
pub fn headers_mut(&mut self) -> &mut HeaderMap {
|
||||
&mut self.as_mut().headers
|
||||
|
@@ -1,7 +1,8 @@
|
||||
//! Http response
|
||||
use std::{mem, str, fmt};
|
||||
use std::rc::Rc;
|
||||
use std::io::Write;
|
||||
use std::cell::RefCell;
|
||||
use std::cell::UnsafeCell;
|
||||
use std::collections::VecDeque;
|
||||
|
||||
use cookie::{Cookie, CookieJar};
|
||||
@@ -34,12 +35,12 @@ pub enum ConnectionType {
|
||||
}
|
||||
|
||||
/// An HTTP Response
|
||||
pub struct HttpResponse(Option<Box<InnerHttpResponse>>);
|
||||
pub struct HttpResponse(Option<Box<InnerHttpResponse>>, Rc<UnsafeCell<Pool>>);
|
||||
|
||||
impl Drop for HttpResponse {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.0.take() {
|
||||
Pool::release(inner)
|
||||
Pool::release(&self.1, inner)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -61,8 +62,10 @@ impl HttpResponse {
|
||||
/// Create http response builder with specific status.
|
||||
#[inline]
|
||||
pub fn build(status: StatusCode) -> HttpResponseBuilder {
|
||||
let (msg, pool) = Pool::get(status);
|
||||
HttpResponseBuilder {
|
||||
response: Some(Pool::get(status)),
|
||||
response: Some(msg),
|
||||
pool: Some(pool),
|
||||
err: None,
|
||||
cookies: None,
|
||||
}
|
||||
@@ -71,7 +74,8 @@ impl HttpResponse {
|
||||
/// Constructs a response
|
||||
#[inline]
|
||||
pub fn new(status: StatusCode, body: Body) -> HttpResponse {
|
||||
HttpResponse(Some(Pool::with_body(status, body)))
|
||||
let (msg, pool) = Pool::with_body(status, body);
|
||||
HttpResponse(Some(msg), pool)
|
||||
}
|
||||
|
||||
/// Constructs a error response
|
||||
@@ -232,9 +236,9 @@ impl fmt::Debug for HttpResponse {
|
||||
///
|
||||
/// This type can be used to construct an instance of `HttpResponse` through a
|
||||
/// builder-like pattern.
|
||||
#[derive(Debug)]
|
||||
pub struct HttpResponseBuilder {
|
||||
response: Option<Box<InnerHttpResponse>>,
|
||||
pool: Option<Rc<UnsafeCell<Pool>>>,
|
||||
err: Option<HttpError>,
|
||||
cookies: Option<CookieJar>,
|
||||
}
|
||||
@@ -506,7 +510,7 @@ impl HttpResponseBuilder {
|
||||
}
|
||||
}
|
||||
response.body = body.into();
|
||||
Ok(HttpResponse(Some(response)))
|
||||
Ok(HttpResponse(Some(response), self.pool.take().unwrap()))
|
||||
}
|
||||
|
||||
/// Set a streaming body and generate `HttpResponse`.
|
||||
@@ -547,6 +551,7 @@ impl HttpResponseBuilder {
|
||||
pub fn take(&mut self) -> HttpResponseBuilder {
|
||||
HttpResponseBuilder {
|
||||
response: self.response.take(),
|
||||
pool: self.pool.take(),
|
||||
err: self.err.take(),
|
||||
cookies: self.cookies.take(),
|
||||
}
|
||||
@@ -748,55 +753,56 @@ impl InnerHttpResponse {
|
||||
/// Internal use only! unsafe
|
||||
struct Pool(VecDeque<Box<InnerHttpResponse>>);
|
||||
|
||||
thread_local!(static POOL: RefCell<Pool> =
|
||||
RefCell::new(Pool(VecDeque::with_capacity(128))));
|
||||
thread_local!(static POOL: Rc<UnsafeCell<Pool>> =
|
||||
Rc::new(UnsafeCell::new(Pool(VecDeque::with_capacity(128)))));
|
||||
|
||||
impl Pool {
|
||||
|
||||
#[inline]
|
||||
fn get(status: StatusCode) -> Box<InnerHttpResponse> {
|
||||
fn get(status: StatusCode) -> (Box<InnerHttpResponse>, Rc<UnsafeCell<Pool>>) {
|
||||
POOL.with(|pool| {
|
||||
if let Some(mut resp) = pool.borrow_mut().0.pop_front() {
|
||||
let p = unsafe{&mut *pool.as_ref().get()};
|
||||
if let Some(mut resp) = p.0.pop_front() {
|
||||
resp.body = Body::Empty;
|
||||
resp.status = status;
|
||||
resp
|
||||
(resp, Rc::clone(pool))
|
||||
} else {
|
||||
Box::new(InnerHttpResponse::new(status, Body::Empty))
|
||||
(Box::new(InnerHttpResponse::new(status, Body::Empty)), Rc::clone(pool))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn with_body(status: StatusCode, body: Body) -> Box<InnerHttpResponse> {
|
||||
fn with_body(status: StatusCode, body: Body)
|
||||
-> (Box<InnerHttpResponse>, Rc<UnsafeCell<Pool>>) {
|
||||
POOL.with(|pool| {
|
||||
if let Some(mut resp) = pool.borrow_mut().0.pop_front() {
|
||||
let p = unsafe{&mut *pool.as_ref().get()};
|
||||
if let Some(mut resp) = p.0.pop_front() {
|
||||
resp.status = status;
|
||||
resp.body = body;
|
||||
resp
|
||||
(resp, Rc::clone(pool))
|
||||
} else {
|
||||
Box::new(InnerHttpResponse::new(status, body))
|
||||
(Box::new(InnerHttpResponse::new(status, body)), Rc::clone(pool))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(boxed_local, inline_always))]
|
||||
fn release(mut inner: Box<InnerHttpResponse>) {
|
||||
POOL.with(|pool| {
|
||||
let v = &mut pool.borrow_mut().0;
|
||||
if v.len() < 128 {
|
||||
inner.headers.clear();
|
||||
inner.version = None;
|
||||
inner.chunked = None;
|
||||
inner.reason = None;
|
||||
inner.encoding = None;
|
||||
inner.connection_type = None;
|
||||
inner.response_size = 0;
|
||||
inner.error = None;
|
||||
inner.write_capacity = MAX_WRITE_BUFFER_SIZE;
|
||||
v.push_front(inner);
|
||||
}
|
||||
})
|
||||
fn release(pool: &Rc<UnsafeCell<Pool>>, mut inner: Box<InnerHttpResponse>) {
|
||||
let pool = unsafe{&mut *pool.as_ref().get()};
|
||||
if pool.0.len() < 128 {
|
||||
inner.headers.clear();
|
||||
inner.version = None;
|
||||
inner.chunked = None;
|
||||
inner.reason = None;
|
||||
inner.encoding = None;
|
||||
inner.connection_type = None;
|
||||
inner.response_size = 0;
|
||||
inner.error = None;
|
||||
inner.write_capacity = MAX_WRITE_BUFFER_SIZE;
|
||||
pool.0.push_front(inner);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -4,9 +4,10 @@ use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::slice::Iter;
|
||||
use std::borrow::Cow;
|
||||
use http::StatusCode;
|
||||
use smallvec::SmallVec;
|
||||
|
||||
use error::{ResponseError, UriSegmentError, InternalError, ErrorBadRequest};
|
||||
use error::{ResponseError, UriSegmentError, InternalError};
|
||||
|
||||
|
||||
/// A trait to abstract the idea of creating a new instance of a type from a path parameter.
|
||||
@@ -144,7 +145,8 @@ macro_rules! FROM_STR {
|
||||
type Err = InternalError<<$type as FromStr>::Err>;
|
||||
|
||||
fn from_param(val: &str) -> Result<Self, Self::Err> {
|
||||
<$type as FromStr>::from_str(val).map_err(ErrorBadRequest)
|
||||
<$type as FromStr>::from_str(val)
|
||||
.map_err(|e| InternalError::new(e, StatusCode::BAD_REQUEST))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -159,6 +159,12 @@ impl PayloadWriter for PayloadSender {
|
||||
if shared.borrow().need_read {
|
||||
PayloadStatus::Read
|
||||
} else {
|
||||
#[cfg(not(test))]
|
||||
{
|
||||
if shared.borrow_mut().io_task.is_none() {
|
||||
shared.borrow_mut().io_task = Some(current_task());
|
||||
}
|
||||
}
|
||||
PayloadStatus::Pause
|
||||
}
|
||||
} else {
|
||||
@@ -176,6 +182,7 @@ struct Inner {
|
||||
items: VecDeque<Bytes>,
|
||||
capacity: usize,
|
||||
task: Option<Task>,
|
||||
io_task: Option<Task>,
|
||||
}
|
||||
|
||||
impl Inner {
|
||||
@@ -189,6 +196,7 @@ impl Inner {
|
||||
need_read: true,
|
||||
capacity: MAX_BUFFER_SIZE,
|
||||
task: None,
|
||||
io_task: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -248,6 +256,9 @@ impl Inner {
|
||||
if self.need_read && self.task.is_none() {
|
||||
self.task = Some(current_task());
|
||||
}
|
||||
if let Some(task) = self.io_task.take() {
|
||||
task.notify()
|
||||
}
|
||||
}
|
||||
Ok(Async::Ready(Some(data)))
|
||||
} else if let Some(err) = self.err.take() {
|
||||
@@ -261,6 +272,9 @@ impl Inner {
|
||||
if self.task.is_none() {
|
||||
self.task = Some(current_task());
|
||||
}
|
||||
if let Some(task) = self.io_task.take() {
|
||||
task.notify()
|
||||
}
|
||||
}
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
|
281
src/pipeline.rs
281
src/pipeline.rs
@@ -453,168 +453,171 @@ impl<S: 'static, H> ProcessResponse<S, H> {
|
||||
fn poll_io(mut self, io: &mut Writer, info: &mut PipelineInfo<S>)
|
||||
-> Result<PipelineState<S, H>, PipelineState<S, H>>
|
||||
{
|
||||
if self.drain.is_none() && self.running != RunningState::Paused {
|
||||
// if task is paused, write buffer is probably full
|
||||
'outter: loop {
|
||||
let result = match mem::replace(&mut self.iostate, IOState::Done) {
|
||||
IOState::Response => {
|
||||
let encoding = self.resp.content_encoding().unwrap_or(info.encoding);
|
||||
loop {
|
||||
if self.drain.is_none() && self.running != RunningState::Paused {
|
||||
// if task is paused, write buffer is probably full
|
||||
'inner: loop {
|
||||
let result = match mem::replace(&mut self.iostate, IOState::Done) {
|
||||
IOState::Response => {
|
||||
let encoding = self.resp.content_encoding().unwrap_or(info.encoding);
|
||||
|
||||
let result = match io.start(info.req_mut().get_inner(),
|
||||
&mut self.resp, encoding)
|
||||
{
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
info.error = Some(err.into());
|
||||
return Ok(FinishingMiddlewares::init(info, self.resp))
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(err) = self.resp.error() {
|
||||
if self.resp.status().is_server_error() {
|
||||
error!("Error occured during request handling: {}", err);
|
||||
} else {
|
||||
warn!("Error occured during request handling: {}", err);
|
||||
}
|
||||
if log_enabled!(Debug) {
|
||||
debug!("{:?}", err);
|
||||
}
|
||||
}
|
||||
|
||||
// always poll stream or actor for the first time
|
||||
match self.resp.replace_body(Body::Empty) {
|
||||
Body::Streaming(stream) => {
|
||||
self.iostate = IOState::Payload(stream);
|
||||
continue
|
||||
},
|
||||
Body::Actor(ctx) => {
|
||||
self.iostate = IOState::Actor(ctx);
|
||||
continue
|
||||
},
|
||||
_ => (),
|
||||
}
|
||||
|
||||
result
|
||||
},
|
||||
IOState::Payload(mut body) => {
|
||||
match body.poll() {
|
||||
Ok(Async::Ready(None)) => {
|
||||
if let Err(err) = io.write_eof() {
|
||||
let result = match io.start(info.req_mut().get_inner(),
|
||||
&mut self.resp, encoding)
|
||||
{
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
info.error = Some(err.into());
|
||||
return Ok(FinishingMiddlewares::init(info, self.resp))
|
||||
}
|
||||
break
|
||||
};
|
||||
|
||||
if let Some(err) = self.resp.error() {
|
||||
if self.resp.status().is_server_error() {
|
||||
error!("Error occured during request handling: {}", err);
|
||||
} else {
|
||||
warn!("Error occured during request handling: {}", err);
|
||||
}
|
||||
if log_enabled!(Debug) {
|
||||
debug!("{:?}", err);
|
||||
}
|
||||
}
|
||||
|
||||
// always poll stream or actor for the first time
|
||||
match self.resp.replace_body(Body::Empty) {
|
||||
Body::Streaming(stream) => {
|
||||
self.iostate = IOState::Payload(stream);
|
||||
continue 'inner
|
||||
},
|
||||
Body::Actor(ctx) => {
|
||||
self.iostate = IOState::Actor(ctx);
|
||||
continue 'inner
|
||||
},
|
||||
Ok(Async::Ready(Some(chunk))) => {
|
||||
self.iostate = IOState::Payload(body);
|
||||
match io.write(chunk.into()) {
|
||||
Err(err) => {
|
||||
_ => (),
|
||||
}
|
||||
|
||||
result
|
||||
},
|
||||
IOState::Payload(mut body) => {
|
||||
match body.poll() {
|
||||
Ok(Async::Ready(None)) => {
|
||||
if let Err(err) = io.write_eof() {
|
||||
info.error = Some(err.into());
|
||||
return Ok(FinishingMiddlewares::init(info, self.resp))
|
||||
},
|
||||
Ok(result) => result
|
||||
}
|
||||
}
|
||||
Ok(Async::NotReady) => {
|
||||
self.iostate = IOState::Payload(body);
|
||||
break
|
||||
},
|
||||
Err(err) => {
|
||||
info.error = Some(err);
|
||||
return Ok(FinishingMiddlewares::init(info, self.resp))
|
||||
}
|
||||
}
|
||||
},
|
||||
IOState::Actor(mut ctx) => {
|
||||
if info.disconnected.take().is_some() {
|
||||
ctx.disconnected();
|
||||
}
|
||||
match ctx.poll() {
|
||||
Ok(Async::Ready(Some(vec))) => {
|
||||
if vec.is_empty() {
|
||||
self.iostate = IOState::Actor(ctx);
|
||||
}
|
||||
break
|
||||
}
|
||||
let mut res = None;
|
||||
for frame in vec {
|
||||
match frame {
|
||||
Frame::Chunk(None) => {
|
||||
info.context = Some(ctx);
|
||||
if let Err(err) = io.write_eof() {
|
||||
info.error = Some(err.into());
|
||||
return Ok(
|
||||
FinishingMiddlewares::init(info, self.resp))
|
||||
}
|
||||
break 'outter
|
||||
},
|
||||
Ok(Async::Ready(Some(chunk))) => {
|
||||
self.iostate = IOState::Payload(body);
|
||||
match io.write(chunk.into()) {
|
||||
Err(err) => {
|
||||
info.error = Some(err.into());
|
||||
return Ok(FinishingMiddlewares::init(info, self.resp))
|
||||
},
|
||||
Frame::Chunk(Some(chunk)) => {
|
||||
match io.write(chunk) {
|
||||
Err(err) => {
|
||||
Ok(result) => result
|
||||
}
|
||||
}
|
||||
Ok(Async::NotReady) => {
|
||||
self.iostate = IOState::Payload(body);
|
||||
break
|
||||
},
|
||||
Err(err) => {
|
||||
info.error = Some(err);
|
||||
return Ok(FinishingMiddlewares::init(info, self.resp))
|
||||
}
|
||||
}
|
||||
},
|
||||
IOState::Actor(mut ctx) => {
|
||||
if info.disconnected.take().is_some() {
|
||||
ctx.disconnected();
|
||||
}
|
||||
match ctx.poll() {
|
||||
Ok(Async::Ready(Some(vec))) => {
|
||||
if vec.is_empty() {
|
||||
self.iostate = IOState::Actor(ctx);
|
||||
break
|
||||
}
|
||||
let mut res = None;
|
||||
for frame in vec {
|
||||
match frame {
|
||||
Frame::Chunk(None) => {
|
||||
info.context = Some(ctx);
|
||||
if let Err(err) = io.write_eof() {
|
||||
info.error = Some(err.into());
|
||||
return Ok(
|
||||
FinishingMiddlewares::init(info, self.resp))
|
||||
},
|
||||
Ok(result) => res = Some(result),
|
||||
}
|
||||
},
|
||||
Frame::Drain(fut) =>
|
||||
self.drain = Some(fut),
|
||||
}
|
||||
break 'inner
|
||||
},
|
||||
Frame::Chunk(Some(chunk)) => {
|
||||
match io.write(chunk) {
|
||||
Err(err) => {
|
||||
info.error = Some(err.into());
|
||||
return Ok(
|
||||
FinishingMiddlewares::init(info, self.resp))
|
||||
},
|
||||
Ok(result) => res = Some(result),
|
||||
}
|
||||
},
|
||||
Frame::Drain(fut) => self.drain = Some(fut),
|
||||
}
|
||||
}
|
||||
self.iostate = IOState::Actor(ctx);
|
||||
if self.drain.is_some() {
|
||||
self.running.resume();
|
||||
break 'inner
|
||||
}
|
||||
res.unwrap()
|
||||
},
|
||||
Ok(Async::Ready(None)) => {
|
||||
break
|
||||
}
|
||||
self.iostate = IOState::Actor(ctx);
|
||||
if self.drain.is_some() {
|
||||
self.running.resume();
|
||||
break 'outter
|
||||
Ok(Async::NotReady) => {
|
||||
self.iostate = IOState::Actor(ctx);
|
||||
break
|
||||
}
|
||||
Err(err) => {
|
||||
info.error = Some(err);
|
||||
return Ok(FinishingMiddlewares::init(info, self.resp))
|
||||
}
|
||||
res.unwrap()
|
||||
},
|
||||
Ok(Async::Ready(None)) => {
|
||||
break
|
||||
}
|
||||
Ok(Async::NotReady) => {
|
||||
self.iostate = IOState::Actor(ctx);
|
||||
break
|
||||
}
|
||||
Err(err) => {
|
||||
info.error = Some(err);
|
||||
return Ok(FinishingMiddlewares::init(info, self.resp))
|
||||
}
|
||||
}
|
||||
}
|
||||
IOState::Done => break,
|
||||
};
|
||||
IOState::Done => break,
|
||||
};
|
||||
|
||||
match result {
|
||||
WriterState::Pause => {
|
||||
self.running.pause();
|
||||
break
|
||||
match result {
|
||||
WriterState::Pause => {
|
||||
self.running.pause();
|
||||
break
|
||||
}
|
||||
WriterState::Done => {
|
||||
self.running.resume()
|
||||
},
|
||||
}
|
||||
WriterState::Done => {
|
||||
self.running.resume()
|
||||
}
|
||||
}
|
||||
|
||||
// flush io but only if we need to
|
||||
if self.running == RunningState::Paused || self.drain.is_some() {
|
||||
match io.poll_completed(false) {
|
||||
Ok(Async::Ready(_)) => {
|
||||
self.running.resume();
|
||||
|
||||
// resolve drain futures
|
||||
if let Some(tx) = self.drain.take() {
|
||||
let _ = tx.send(());
|
||||
}
|
||||
// restart io processing
|
||||
continue
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// flush io but only if we need to
|
||||
if self.running == RunningState::Paused || self.drain.is_some() {
|
||||
match io.poll_completed(false) {
|
||||
Ok(Async::Ready(_)) => {
|
||||
self.running.resume();
|
||||
|
||||
// resolve drain futures
|
||||
if let Some(tx) = self.drain.take() {
|
||||
let _ = tx.send(());
|
||||
Ok(Async::NotReady) =>
|
||||
return Err(PipelineState::Response(self)),
|
||||
Err(err) => {
|
||||
info.error = Some(err.into());
|
||||
return Ok(FinishingMiddlewares::init(info, self.resp))
|
||||
}
|
||||
// restart io processing
|
||||
return self.poll_io(io, info);
|
||||
},
|
||||
Ok(Async::NotReady) => return Err(PipelineState::Response(self)),
|
||||
Err(err) => {
|
||||
info.error = Some(err.into());
|
||||
return Ok(FinishingMiddlewares::init(info, self.resp))
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
// response is completed
|
||||
|
@@ -368,6 +368,7 @@ impl ContentEncoder {
|
||||
response_encoding: ContentEncoding) -> ContentEncoder
|
||||
{
|
||||
let version = resp.version().unwrap_or_else(|| req.version);
|
||||
let is_head = req.method == Method::HEAD;
|
||||
let mut body = resp.replace_body(Body::Empty);
|
||||
let has_body = match body {
|
||||
Body::Empty => false,
|
||||
@@ -410,7 +411,9 @@ impl ContentEncoder {
|
||||
TransferEncoding::length(0, buf)
|
||||
},
|
||||
Body::Binary(ref mut bytes) => {
|
||||
if encoding.is_compression() {
|
||||
if !(encoding == ContentEncoding::Identity
|
||||
|| encoding == ContentEncoding::Auto)
|
||||
{
|
||||
let tmp = SharedBytes::default();
|
||||
let transfer = TransferEncoding::eof(tmp.clone());
|
||||
let mut enc = match encoding {
|
||||
@@ -431,13 +434,13 @@ impl ContentEncoder {
|
||||
*bytes = Binary::from(tmp.take());
|
||||
encoding = ContentEncoding::Identity;
|
||||
}
|
||||
if req.method == Method::HEAD {
|
||||
if is_head {
|
||||
let mut b = BytesMut::new();
|
||||
let _ = write!(b, "{}", bytes.len());
|
||||
resp.headers_mut().insert(
|
||||
CONTENT_LENGTH, HeaderValue::try_from(b.freeze()).unwrap());
|
||||
} else {
|
||||
resp.headers_mut().remove(CONTENT_LENGTH);
|
||||
// resp.headers_mut().remove(CONTENT_LENGTH);
|
||||
}
|
||||
TransferEncoding::eof(buf)
|
||||
}
|
||||
@@ -460,7 +463,7 @@ impl ContentEncoder {
|
||||
}
|
||||
};
|
||||
//
|
||||
if req.method == Method::HEAD {
|
||||
if is_head {
|
||||
transfer.kind = TransferEncodingKind::Length(0);
|
||||
} else {
|
||||
resp.replace_body(body);
|
||||
|
@@ -51,7 +51,7 @@ pub(crate) struct Http1<T: IoStream, H: 'static> {
|
||||
flags: Flags,
|
||||
settings: Rc<WorkerSettings<H>>,
|
||||
addr: Option<SocketAddr>,
|
||||
stream: H1Writer<T>,
|
||||
stream: H1Writer<T, H>,
|
||||
reader: Reader,
|
||||
read_buf: BytesMut,
|
||||
tasks: VecDeque<Entry>,
|
||||
@@ -72,7 +72,7 @@ impl<T, H> Http1<T, H>
|
||||
{
|
||||
let bytes = settings.get_shared_bytes();
|
||||
Http1{ flags: Flags::KEEPALIVE,
|
||||
stream: H1Writer::new(stream, bytes),
|
||||
stream: H1Writer::new(stream, bytes, Rc::clone(&settings)),
|
||||
reader: Reader::new(),
|
||||
tasks: VecDeque::new(),
|
||||
keepalive_timer: None,
|
||||
@@ -353,7 +353,7 @@ impl Reader {
|
||||
PayloadStatus::Read
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[inline]
|
||||
fn decode(&mut self, buf: &mut BytesMut, payload: &mut PayloadInfo)
|
||||
-> Result<Decoding, ReaderError>
|
||||
@@ -490,6 +490,8 @@ impl Reader {
|
||||
fn parse_message<H>(buf: &mut BytesMut, settings: &WorkerSettings<H>)
|
||||
-> Poll<(HttpRequest, Option<PayloadInfo>), ParseError> {
|
||||
// Parse http message
|
||||
let mut has_te = false;
|
||||
let mut has_upgrade = false;
|
||||
let msg = {
|
||||
let bytes_ptr = buf.as_ref().as_ptr() as usize;
|
||||
let mut headers: [httparse::Header; MAX_HEADERS] =
|
||||
@@ -500,13 +502,9 @@ impl Reader {
|
||||
let mut req = httparse::Request::new(&mut headers);
|
||||
match req.parse(b)? {
|
||||
httparse::Status::Complete(len) => {
|
||||
let method = Method::try_from(req.method.unwrap())
|
||||
let method = Method::from_bytes(req.method.unwrap().as_bytes())
|
||||
.map_err(|_| ParseError::Method)?;
|
||||
let path = req.path.unwrap();
|
||||
let path_start = path.as_ptr() as usize - bytes_ptr;
|
||||
let path_end = path_start + path.len();
|
||||
let path = (path_start, path_end);
|
||||
|
||||
let path = Uri::try_from(req.path.unwrap()).unwrap();
|
||||
let version = if req.version.unwrap() == 1 {
|
||||
Version::HTTP_11
|
||||
} else {
|
||||
@@ -522,28 +520,33 @@ impl Reader {
|
||||
|
||||
// convert headers
|
||||
let msg = settings.get_http_message();
|
||||
for header in headers[..headers_len].iter() {
|
||||
if let Ok(name) = HeaderName::try_from(header.name) {
|
||||
let v_start = header.value.as_ptr() as usize - bytes_ptr;
|
||||
let v_end = v_start + header.value.len();
|
||||
let value = unsafe {
|
||||
HeaderValue::from_shared_unchecked(slice.slice(v_start, v_end)) };
|
||||
msg.get_mut().headers.append(name, value);
|
||||
} else {
|
||||
return Err(ParseError::Header)
|
||||
{
|
||||
let msg_mut = msg.get_mut();
|
||||
for header in headers[..headers_len].iter() {
|
||||
if let Ok(name) = HeaderName::from_bytes(header.name.as_bytes()) {
|
||||
has_te = has_te || name == header::TRANSFER_ENCODING;
|
||||
has_upgrade = has_upgrade || name == header::UPGRADE;
|
||||
let v_start = header.value.as_ptr() as usize - bytes_ptr;
|
||||
let v_end = v_start + header.value.len();
|
||||
let value = unsafe {
|
||||
HeaderValue::from_shared_unchecked(
|
||||
slice.slice(v_start, v_end)) };
|
||||
msg_mut.headers.append(name, value);
|
||||
} else {
|
||||
return Err(ParseError::Header)
|
||||
}
|
||||
}
|
||||
|
||||
msg_mut.uri = path;
|
||||
msg_mut.method = method;
|
||||
msg_mut.version = version;
|
||||
}
|
||||
|
||||
let path = slice.slice(path.0, path.1);
|
||||
let uri = Uri::from_shared(path).map_err(ParseError::Uri)?;
|
||||
|
||||
msg.get_mut().uri = uri;
|
||||
msg.get_mut().method = method;
|
||||
msg.get_mut().version = version;
|
||||
msg
|
||||
};
|
||||
|
||||
let decoder = if let Some(len) = msg.get_ref().headers.get(header::CONTENT_LENGTH) {
|
||||
let decoder = if let Some(len) =
|
||||
msg.get_ref().headers.get(header::CONTENT_LENGTH)
|
||||
{
|
||||
// Content-Length
|
||||
if let Ok(s) = len.to_str() {
|
||||
if let Ok(len) = s.parse::<u64>() {
|
||||
@@ -556,12 +559,10 @@ impl Reader {
|
||||
debug!("illegal Content-Length: {:?}", len);
|
||||
return Err(ParseError::Header)
|
||||
}
|
||||
} else if chunked(&msg.get_mut().headers)? {
|
||||
} else if has_te && chunked(&msg.get_mut().headers)? {
|
||||
// Chunked encoding
|
||||
Some(Decoder::chunked())
|
||||
} else if msg.get_ref().headers.contains_key(header::UPGRADE) ||
|
||||
msg.get_ref().method == Method::CONNECT
|
||||
{
|
||||
} else if has_upgrade || msg.get_ref().method == Method::CONNECT {
|
||||
Some(Decoder::eof())
|
||||
} else {
|
||||
None
|
||||
@@ -570,7 +571,7 @@ impl Reader {
|
||||
if let Some(decoder) = decoder {
|
||||
let (psender, payload) = Payload::new(false);
|
||||
let info = PayloadInfo {
|
||||
tx: PayloadType::new(&msg.get_mut().headers, psender),
|
||||
tx: PayloadType::new(&msg.get_ref().headers, psender),
|
||||
decoder,
|
||||
};
|
||||
msg.get_mut().payload = Some(payload);
|
||||
|
@@ -1,11 +1,12 @@
|
||||
#![cfg_attr(feature = "cargo-clippy", allow(redundant_field_names))]
|
||||
|
||||
use std::{io, mem};
|
||||
use std::rc::Rc;
|
||||
use bytes::BufMut;
|
||||
use futures::{Async, Poll};
|
||||
use tokio_io::AsyncWrite;
|
||||
use http::{Method, Version};
|
||||
use http::header::{HeaderValue, CONNECTION, DATE};
|
||||
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE};
|
||||
|
||||
use helpers;
|
||||
use body::{Body, Binary};
|
||||
@@ -15,6 +16,7 @@ use httpresponse::HttpResponse;
|
||||
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
|
||||
use super::shared::SharedBytes;
|
||||
use super::encoding::ContentEncoder;
|
||||
use super::settings::WorkerSettings;
|
||||
|
||||
const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific
|
||||
|
||||
@@ -27,7 +29,7 @@ bitflags! {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct H1Writer<T: AsyncWrite> {
|
||||
pub(crate) struct H1Writer<T: AsyncWrite, H: 'static> {
|
||||
flags: Flags,
|
||||
stream: T,
|
||||
encoder: ContentEncoder,
|
||||
@@ -35,11 +37,14 @@ pub(crate) struct H1Writer<T: AsyncWrite> {
|
||||
headers_size: u32,
|
||||
buffer: SharedBytes,
|
||||
buffer_capacity: usize,
|
||||
settings: Rc<WorkerSettings<H>>,
|
||||
}
|
||||
|
||||
impl<T: AsyncWrite> H1Writer<T> {
|
||||
impl<T: AsyncWrite, H: 'static> H1Writer<T, H> {
|
||||
|
||||
pub fn new(stream: T, buf: SharedBytes) -> H1Writer<T> {
|
||||
pub fn new(stream: T, buf: SharedBytes, settings: Rc<WorkerSettings<H>>)
|
||||
-> H1Writer<T, H>
|
||||
{
|
||||
H1Writer {
|
||||
flags: Flags::empty(),
|
||||
encoder: ContentEncoder::empty(buf.clone()),
|
||||
@@ -48,6 +53,7 @@ impl<T: AsyncWrite> H1Writer<T> {
|
||||
buffer: buf,
|
||||
buffer_capacity: 0,
|
||||
stream,
|
||||
settings,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -76,7 +82,9 @@ impl<T: AsyncWrite> H1Writer<T> {
|
||||
self.disconnected();
|
||||
return Err(io::Error::new(io::ErrorKind::WriteZero, ""))
|
||||
},
|
||||
Ok(n) => written += n,
|
||||
Ok(n) => {
|
||||
written += n;
|
||||
},
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
return Ok(written)
|
||||
}
|
||||
@@ -87,7 +95,7 @@ impl<T: AsyncWrite> H1Writer<T> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncWrite> Writer for H1Writer<T> {
|
||||
impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
|
||||
|
||||
#[inline]
|
||||
fn written(&self) -> u64 {
|
||||
@@ -126,11 +134,14 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
|
||||
// render message
|
||||
{
|
||||
let mut buffer = self.buffer.get_mut();
|
||||
if let Body::Binary(ref bytes) = body {
|
||||
buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len());
|
||||
let mut is_bin = if let Body::Binary(ref bytes) = body {
|
||||
buffer.reserve(
|
||||
256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len());
|
||||
true
|
||||
} else {
|
||||
buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE);
|
||||
}
|
||||
false
|
||||
};
|
||||
|
||||
// status line
|
||||
helpers::write_status_line(version, msg.status().as_u16(), &mut buffer);
|
||||
@@ -139,21 +150,28 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
|
||||
match body {
|
||||
Body::Empty =>
|
||||
if req.method != Method::HEAD {
|
||||
SharedBytes::extend_from_slice_(buffer, b"\r\ncontent-length: 0\r\n");
|
||||
SharedBytes::put_slice(
|
||||
buffer, b"\r\ncontent-length: 0\r\n");
|
||||
} else {
|
||||
SharedBytes::extend_from_slice_(buffer, b"\r\n");
|
||||
SharedBytes::put_slice(buffer, b"\r\n");
|
||||
},
|
||||
Body::Binary(ref bytes) =>
|
||||
helpers::write_content_length(bytes.len(), &mut buffer),
|
||||
_ =>
|
||||
SharedBytes::extend_from_slice_(buffer, b"\r\n"),
|
||||
SharedBytes::put_slice(buffer, b"\r\n"),
|
||||
}
|
||||
|
||||
// write headers
|
||||
let mut pos = 0;
|
||||
let mut has_date = false;
|
||||
let mut remaining = buffer.remaining_mut();
|
||||
let mut buf: &mut [u8] = unsafe{ mem::transmute(buffer.bytes_mut()) };
|
||||
for (key, value) in msg.headers() {
|
||||
if is_bin && key == CONTENT_LENGTH {
|
||||
is_bin = false;
|
||||
continue
|
||||
}
|
||||
has_date = has_date || key == DATE;
|
||||
let v = value.as_ref();
|
||||
let k = key.as_str().as_bytes();
|
||||
let len = k.len() + v.len() + 4;
|
||||
@@ -182,9 +200,9 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
|
||||
}
|
||||
unsafe{buffer.advance_mut(pos)};
|
||||
|
||||
// using helpers::date is quite a lot faster
|
||||
if !msg.headers().contains_key(DATE) {
|
||||
helpers::date(&mut buffer);
|
||||
// optimized date header
|
||||
if !has_date {
|
||||
self.settings.set_date(&mut buffer);
|
||||
} else {
|
||||
// msg eof
|
||||
SharedBytes::extend_from_slice_(buffer, b"\r\n");
|
||||
@@ -211,9 +229,10 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
|
||||
// shortcut for upgraded connection
|
||||
if self.flags.contains(Flags::UPGRADE) {
|
||||
if self.buffer.is_empty() {
|
||||
let n = self.write_data(payload.as_ref())?;
|
||||
if payload.len() < n {
|
||||
self.buffer.extend_from_slice(&payload.as_ref()[n..]);
|
||||
let pl: &[u8] = payload.as_ref();
|
||||
let n = self.write_data(pl)?;
|
||||
if n < pl.len() {
|
||||
self.buffer.extend_from_slice(&pl[n..]);
|
||||
return Ok(WriterState::Done);
|
||||
}
|
||||
} else {
|
||||
@@ -224,7 +243,7 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
|
||||
self.encoder.write(payload)?;
|
||||
}
|
||||
} else {
|
||||
// might be response to EXCEPT
|
||||
// could be response to EXCEPT header
|
||||
self.buffer.extend_from_slice(payload.as_ref())
|
||||
}
|
||||
}
|
||||
|
@@ -43,7 +43,7 @@ struct Http2<T, H>
|
||||
settings: Rc<WorkerSettings<H>>,
|
||||
addr: Option<SocketAddr>,
|
||||
state: State<IoWrapper<T>>,
|
||||
tasks: VecDeque<Entry>,
|
||||
tasks: VecDeque<Entry<H>>,
|
||||
keepalive_timer: Option<Timeout>,
|
||||
}
|
||||
|
||||
@@ -274,20 +274,20 @@ bitflags! {
|
||||
}
|
||||
}
|
||||
|
||||
struct Entry {
|
||||
struct Entry<H: 'static> {
|
||||
task: Box<HttpHandlerTask>,
|
||||
payload: PayloadType,
|
||||
recv: RecvStream,
|
||||
stream: H2Writer,
|
||||
stream: H2Writer<H>,
|
||||
flags: EntryFlags,
|
||||
}
|
||||
|
||||
impl Entry {
|
||||
fn new<H>(parts: Parts,
|
||||
recv: RecvStream,
|
||||
resp: SendResponse<Bytes>,
|
||||
addr: Option<SocketAddr>,
|
||||
settings: &Rc<WorkerSettings<H>>) -> Entry
|
||||
impl<H: 'static> Entry<H> {
|
||||
fn new(parts: Parts,
|
||||
recv: RecvStream,
|
||||
resp: SendResponse<Bytes>,
|
||||
addr: Option<SocketAddr>,
|
||||
settings: &Rc<WorkerSettings<H>>) -> Entry<H>
|
||||
where H: HttpHandler + 'static
|
||||
{
|
||||
// Payload and Content-Encoding
|
||||
@@ -320,7 +320,8 @@ impl Entry {
|
||||
|
||||
Entry {task: task.unwrap_or_else(|| Pipeline::error(HttpNotFound)),
|
||||
payload: psender,
|
||||
stream: H2Writer::new(resp, settings.get_shared_bytes()),
|
||||
stream: H2Writer::new(
|
||||
resp, settings.get_shared_bytes(), Rc::clone(settings)),
|
||||
flags: EntryFlags::empty(),
|
||||
recv,
|
||||
}
|
||||
|
@@ -1,6 +1,7 @@
|
||||
#![cfg_attr(feature = "cargo-clippy", allow(redundant_field_names))]
|
||||
|
||||
use std::{io, cmp};
|
||||
use std::rc::Rc;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use futures::{Async, Poll};
|
||||
use http2::{Reason, SendStream};
|
||||
@@ -15,6 +16,7 @@ use httprequest::HttpInnerMessage;
|
||||
use httpresponse::HttpResponse;
|
||||
use super::encoding::ContentEncoder;
|
||||
use super::shared::SharedBytes;
|
||||
use super::settings::WorkerSettings;
|
||||
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
|
||||
|
||||
const CHUNK_SIZE: usize = 16_384;
|
||||
@@ -28,7 +30,7 @@ bitflags! {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct H2Writer {
|
||||
pub(crate) struct H2Writer<H: 'static> {
|
||||
respond: SendResponse<Bytes>,
|
||||
stream: Option<SendStream<Bytes>>,
|
||||
encoder: ContentEncoder,
|
||||
@@ -36,13 +38,17 @@ pub(crate) struct H2Writer {
|
||||
written: u64,
|
||||
buffer: SharedBytes,
|
||||
buffer_capacity: usize,
|
||||
settings: Rc<WorkerSettings<H>>,
|
||||
}
|
||||
|
||||
impl H2Writer {
|
||||
impl<H: 'static> H2Writer<H> {
|
||||
|
||||
pub fn new(respond: SendResponse<Bytes>, buf: SharedBytes) -> H2Writer {
|
||||
pub fn new(respond: SendResponse<Bytes>,
|
||||
buf: SharedBytes, settings: Rc<WorkerSettings<H>>) -> H2Writer<H>
|
||||
{
|
||||
H2Writer {
|
||||
respond,
|
||||
settings,
|
||||
stream: None,
|
||||
encoder: ContentEncoder::empty(buf.clone()),
|
||||
flags: Flags::empty(),
|
||||
@@ -59,7 +65,7 @@ impl H2Writer {
|
||||
}
|
||||
}
|
||||
|
||||
impl Writer for H2Writer {
|
||||
impl<H: 'static> Writer for H2Writer<H> {
|
||||
|
||||
fn written(&self) -> u64 {
|
||||
self.written
|
||||
@@ -84,7 +90,7 @@ impl Writer for H2Writer {
|
||||
// using helpers::date is quite a lot faster
|
||||
if !msg.headers().contains_key(DATE) {
|
||||
let mut bytes = BytesMut::with_capacity(29);
|
||||
helpers::date_value(&mut bytes);
|
||||
self.settings.set_date_simple(&mut bytes);
|
||||
msg.headers_mut().insert(DATE, HeaderValue::try_from(bytes.freeze()).unwrap());
|
||||
}
|
||||
|
||||
@@ -95,7 +101,8 @@ impl Writer for H2Writer {
|
||||
helpers::convert_usize(bytes.len(), &mut val);
|
||||
let l = val.len();
|
||||
msg.headers_mut().insert(
|
||||
CONTENT_LENGTH, HeaderValue::try_from(val.split_to(l-2).freeze()).unwrap());
|
||||
CONTENT_LENGTH,
|
||||
HeaderValue::try_from(val.split_to(l-2).freeze()).unwrap());
|
||||
}
|
||||
Body::Empty => {
|
||||
msg.headers_mut().insert(CONTENT_LENGTH, HeaderValue::from_static("0"));
|
||||
|
@@ -1,7 +1,10 @@
|
||||
use std::{fmt, net};
|
||||
use std::{fmt, mem, net};
|
||||
use std::fmt::Write;
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
use std::cell::{Cell, RefCell, RefMut, UnsafeCell};
|
||||
use time;
|
||||
use bytes::BytesMut;
|
||||
use futures_cpupool::{Builder, CpuPool};
|
||||
|
||||
use helpers;
|
||||
@@ -95,6 +98,8 @@ impl ServerSettings {
|
||||
}
|
||||
}
|
||||
|
||||
// "Sun, 06 Nov 1994 08:49:37 GMT".len()
|
||||
const DATE_VALUE_LENGTH: usize = 29;
|
||||
|
||||
pub(crate) struct WorkerSettings<H> {
|
||||
h: RefCell<Vec<H>>,
|
||||
@@ -104,6 +109,7 @@ pub(crate) struct WorkerSettings<H> {
|
||||
messages: Rc<helpers::SharedMessagePool>,
|
||||
channels: Cell<usize>,
|
||||
node: Box<Node<()>>,
|
||||
date: UnsafeCell<Date>,
|
||||
}
|
||||
|
||||
impl<H> WorkerSettings<H> {
|
||||
@@ -121,6 +127,7 @@ impl<H> WorkerSettings<H> {
|
||||
messages: Rc::new(helpers::SharedMessagePool::new()),
|
||||
channels: Cell::new(0),
|
||||
node: Box::new(Node::head()),
|
||||
date: UnsafeCell::new(Date::new()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -164,4 +171,67 @@ impl<H> WorkerSettings<H> {
|
||||
error!("Number of removed channels is bigger than added channel. Bug in actix-web");
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_date(&self) {
|
||||
unsafe{&mut *self.date.get()}.update();
|
||||
}
|
||||
|
||||
pub fn set_date(&self, dst: &mut BytesMut) {
|
||||
let mut buf: [u8; 39] = unsafe { mem::uninitialized() };
|
||||
buf[..6].copy_from_slice(b"date: ");
|
||||
buf[6..35].copy_from_slice(&(unsafe{&*self.date.get()}.bytes));
|
||||
buf[35..].copy_from_slice(b"\r\n\r\n");
|
||||
dst.extend_from_slice(&buf);
|
||||
}
|
||||
|
||||
pub fn set_date_simple(&self, dst: &mut BytesMut) {
|
||||
dst.extend_from_slice(&(unsafe{&*self.date.get()}.bytes));
|
||||
}
|
||||
}
|
||||
|
||||
struct Date {
|
||||
bytes: [u8; DATE_VALUE_LENGTH],
|
||||
pos: usize,
|
||||
}
|
||||
|
||||
impl Date {
|
||||
fn new() -> Date {
|
||||
let mut date = Date{bytes: [0; DATE_VALUE_LENGTH], pos: 0};
|
||||
date.update();
|
||||
date
|
||||
}
|
||||
fn update(&mut self) {
|
||||
self.pos = 0;
|
||||
write!(self, "{}", time::at_utc(time::get_time()).rfc822()).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Write for Date {
|
||||
fn write_str(&mut self, s: &str) -> fmt::Result {
|
||||
let len = s.len();
|
||||
self.bytes[self.pos..self.pos + len].copy_from_slice(s.as_bytes());
|
||||
self.pos += len;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_date_len() {
|
||||
assert_eq!(DATE_VALUE_LENGTH, "Sun, 06 Nov 1994 08:49:37 GMT".len());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_date() {
|
||||
let settings = WorkerSettings::<()>::new(Vec::new(), KeepAlive::Os);
|
||||
let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
|
||||
settings.set_date(&mut buf1);
|
||||
let mut buf2 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
|
||||
settings.set_date(&mut buf2);
|
||||
assert_eq!(buf1, buf2);
|
||||
}
|
||||
}
|
||||
|
@@ -18,7 +18,6 @@ use native_tls::TlsAcceptor;
|
||||
#[cfg(feature="alpn")]
|
||||
use openssl::ssl::{AlpnError, SslAcceptorBuilder};
|
||||
|
||||
use helpers;
|
||||
use super::{IntoHttpHandler, IoStream, KeepAlive};
|
||||
use super::{PauseServer, ResumeServer, StopServer};
|
||||
use super::channel::{HttpChannel, WrapperStream};
|
||||
@@ -58,13 +57,8 @@ enum ServerCommand {
|
||||
WorkerDied(usize, Info),
|
||||
}
|
||||
|
||||
impl<H> Actor for HttpServer<H> where H: IntoHttpHandler
|
||||
{
|
||||
impl<H> Actor for HttpServer<H> where H: IntoHttpHandler {
|
||||
type Context = Context<Self>;
|
||||
|
||||
fn started(&mut self, ctx: &mut Self::Context) {
|
||||
self.update_time(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
|
||||
@@ -95,11 +89,6 @@ impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
|
||||
}
|
||||
}
|
||||
|
||||
fn update_time(&self, ctx: &mut Context<Self>) {
|
||||
helpers::update_date();
|
||||
ctx.run_later(Duration::new(1, 0), |slf, ctx| slf.update_time(ctx));
|
||||
}
|
||||
|
||||
/// Set number of workers to start.
|
||||
///
|
||||
/// By default http server uses number of available logical cpu as threads count.
|
||||
|
@@ -22,7 +22,6 @@ use tokio_openssl::SslAcceptorExt;
|
||||
use actix::*;
|
||||
use actix::msgs::StopArbiter;
|
||||
|
||||
use helpers;
|
||||
use server::{HttpHandler, KeepAlive};
|
||||
use server::channel::HttpChannel;
|
||||
use server::settings::WorkerSettings;
|
||||
@@ -76,7 +75,7 @@ impl<H: HttpHandler + 'static> Worker<H> {
|
||||
}
|
||||
|
||||
fn update_time(&self, ctx: &mut Context<Self>) {
|
||||
helpers::update_date();
|
||||
self.settings.update_date();
|
||||
ctx.run_later(time::Duration::new(1, 0), |slf, ctx| slf.update_time(ctx));
|
||||
}
|
||||
|
||||
|
165
src/test.rs
165
src/test.rs
@@ -5,7 +5,7 @@ use std::rc::Rc;
|
||||
use std::sync::mpsc;
|
||||
use std::str::FromStr;
|
||||
|
||||
use actix::{Arbiter, Addr, Syn, System, SystemRunner, msgs};
|
||||
use actix::{Actor, Arbiter, Addr, Syn, System, SystemRunner, Unsync, msgs};
|
||||
use cookie::Cookie;
|
||||
use http::{Uri, Method, Version, HeaderMap, HttpTryFrom};
|
||||
use http::header::HeaderName;
|
||||
@@ -14,6 +14,9 @@ use tokio_core::net::TcpListener;
|
||||
use tokio_core::reactor::Core;
|
||||
use net2::TcpBuilder;
|
||||
|
||||
#[cfg(feature="alpn")]
|
||||
use openssl::ssl::SslAcceptor;
|
||||
|
||||
use ws;
|
||||
use body::Binary;
|
||||
use error::Error;
|
||||
@@ -27,7 +30,7 @@ use payload::Payload;
|
||||
use httprequest::HttpRequest;
|
||||
use httpresponse::HttpResponse;
|
||||
use server::{HttpServer, IntoHttpHandler, ServerSettings};
|
||||
use client::{ClientRequest, ClientRequestBuilder};
|
||||
use client::{ClientRequest, ClientRequestBuilder, ClientConnector};
|
||||
|
||||
/// The `TestServer` type.
|
||||
///
|
||||
@@ -60,6 +63,8 @@ pub struct TestServer {
|
||||
thread: Option<thread::JoinHandle<()>>,
|
||||
system: SystemRunner,
|
||||
server_sys: Addr<Syn, System>,
|
||||
ssl: bool,
|
||||
conn: Addr<Unsync, ClientConnector>,
|
||||
}
|
||||
|
||||
impl TestServer {
|
||||
@@ -69,9 +74,26 @@ impl TestServer {
|
||||
/// This method accepts configuration method. You can add
|
||||
/// middlewares or set handlers for test application.
|
||||
pub fn new<F>(config: F) -> Self
|
||||
where F: Sync + Send + 'static + Fn(&mut TestApp<()>),
|
||||
where F: Sync + Send + 'static + Fn(&mut TestApp<()>)
|
||||
{
|
||||
TestServer::with_state(||(), config)
|
||||
TestServerBuilder::new(||()).start(config)
|
||||
}
|
||||
|
||||
/// Create test server builder
|
||||
pub fn build() -> TestServerBuilder<()> {
|
||||
TestServerBuilder::new(||())
|
||||
}
|
||||
|
||||
/// Create test server builder with specific state factory
|
||||
///
|
||||
/// This method can be used for constructing application state.
|
||||
/// Also it can be used for external dependecy initialization,
|
||||
/// like creating sync actors for diesel integration.
|
||||
pub fn build_with_state<F, S>(state: F) -> TestServerBuilder<S>
|
||||
where F: Fn() -> S + Sync + Send + 'static,
|
||||
S: 'static,
|
||||
{
|
||||
TestServerBuilder::new(state)
|
||||
}
|
||||
|
||||
/// Start new test server with application factory
|
||||
@@ -98,15 +120,20 @@ impl TestServer {
|
||||
let _ = sys.run();
|
||||
});
|
||||
|
||||
let sys = System::new("actix-test");
|
||||
let (server_sys, addr) = rx.recv().unwrap();
|
||||
TestServer {
|
||||
addr,
|
||||
thread: Some(join),
|
||||
system: System::new("actix-test"),
|
||||
server_sys,
|
||||
ssl: false,
|
||||
conn: TestServer::get_conn(),
|
||||
thread: Some(join),
|
||||
system: sys,
|
||||
}
|
||||
}
|
||||
|
||||
#[deprecated(since="0.4.10",
|
||||
note="please use `TestServer::build_with_state()` instead")]
|
||||
/// Start new test server with custom application state
|
||||
///
|
||||
/// This method accepts state factory and configuration method.
|
||||
@@ -135,12 +162,30 @@ impl TestServer {
|
||||
let _ = sys.run();
|
||||
});
|
||||
|
||||
let system = System::new("actix-test");
|
||||
let (server_sys, addr) = rx.recv().unwrap();
|
||||
TestServer {
|
||||
addr,
|
||||
server_sys,
|
||||
system,
|
||||
ssl: false,
|
||||
conn: TestServer::get_conn(),
|
||||
thread: Some(join),
|
||||
system: System::new("actix-test"),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_conn() -> Addr<Unsync, ClientConnector> {
|
||||
#[cfg(feature="alpn")]
|
||||
{
|
||||
use openssl::ssl::{SslMethod, SslConnector, SslVerifyMode};
|
||||
|
||||
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
|
||||
builder.set_verify(SslVerifyMode::NONE);
|
||||
ClientConnector::with_connector(builder.build()).start()
|
||||
}
|
||||
#[cfg(not(feature="alpn"))]
|
||||
{
|
||||
ClientConnector::default().start()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -162,9 +207,9 @@ impl TestServer {
|
||||
/// Construct test server url
|
||||
pub fn url(&self, uri: &str) -> String {
|
||||
if uri.starts_with('/') {
|
||||
format!("http://{}{}", self.addr, uri)
|
||||
format!("{}://{}{}", if self.ssl {"https"} else {"http"}, self.addr, uri)
|
||||
} else {
|
||||
format!("http://{}/{}", self.addr, uri)
|
||||
format!("{}://{}/{}", if self.ssl {"https"} else {"http"}, self.addr, uri)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -186,7 +231,8 @@ impl TestServer {
|
||||
/// Connect to websocket server
|
||||
pub fn ws(&mut self) -> Result<(ws::ClientReader, ws::ClientWriter), ws::ClientError> {
|
||||
let url = self.url("/");
|
||||
self.system.run_until_complete(ws::Client::new(url).connect())
|
||||
self.system.run_until_complete(
|
||||
ws::Client::with_connector(url, self.conn.clone()).connect())
|
||||
}
|
||||
|
||||
/// Create `GET` request
|
||||
@@ -208,7 +254,9 @@ impl TestServer {
|
||||
pub fn client(&self, meth: Method, path: &str) -> ClientRequestBuilder {
|
||||
ClientRequest::build()
|
||||
.method(meth)
|
||||
.uri(self.url(path).as_str()).take()
|
||||
.uri(self.url(path).as_str())
|
||||
.with_connector(self.conn.clone())
|
||||
.take()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -218,6 +266,101 @@ impl Drop for TestServer {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TestServerBuilder<S> {
|
||||
state: Box<Fn() -> S + Sync + Send + 'static>,
|
||||
#[cfg(feature="alpn")]
|
||||
ssl: Option<SslAcceptor>,
|
||||
}
|
||||
|
||||
impl<S: 'static> TestServerBuilder<S> {
|
||||
|
||||
pub fn new<F>(state: F) -> TestServerBuilder<S>
|
||||
where F: Fn() -> S + Sync + Send + 'static
|
||||
{
|
||||
TestServerBuilder {
|
||||
state: Box::new(state),
|
||||
#[cfg(feature="alpn")]
|
||||
ssl: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature="alpn")]
|
||||
/// Create ssl server
|
||||
pub fn ssl(mut self, ssl: SslAcceptor) -> Self {
|
||||
self.ssl = Some(ssl);
|
||||
self
|
||||
}
|
||||
|
||||
#[allow(unused_mut)]
|
||||
/// Configure test application and run test server
|
||||
pub fn start<F>(mut self, config: F) -> TestServer
|
||||
where F: Sync + Send + 'static + Fn(&mut TestApp<S>),
|
||||
{
|
||||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
#[cfg(feature="alpn")]
|
||||
let ssl = self.ssl.is_some();
|
||||
#[cfg(not(feature="alpn"))]
|
||||
let ssl = false;
|
||||
|
||||
// run server in separate thread
|
||||
let join = thread::spawn(move || {
|
||||
let sys = System::new("actix-test-server");
|
||||
|
||||
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
|
||||
let local_addr = tcp.local_addr().unwrap();
|
||||
let tcp = TcpListener::from_listener(
|
||||
tcp, &local_addr, Arbiter::handle()).unwrap();
|
||||
|
||||
let state = self.state;
|
||||
|
||||
let srv = HttpServer::new(move || {
|
||||
let mut app = TestApp::new(state());
|
||||
config(&mut app);
|
||||
vec![app]})
|
||||
.disable_signals();
|
||||
|
||||
#[cfg(feature="alpn")]
|
||||
{
|
||||
use std::io;
|
||||
use futures::Stream;
|
||||
use tokio_openssl::SslAcceptorExt;
|
||||
|
||||
let ssl = self.ssl.take();
|
||||
if let Some(ssl) = ssl {
|
||||
srv.start_incoming(
|
||||
tcp.incoming()
|
||||
.and_then(move |(sock, addr)| {
|
||||
ssl.accept_async(sock)
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
|
||||
.map(move |s| (s, addr))
|
||||
}),
|
||||
false);
|
||||
} else {
|
||||
srv.start_incoming(tcp.incoming(), false);
|
||||
}
|
||||
}
|
||||
#[cfg(not(feature="alpn"))]
|
||||
{
|
||||
srv.start_incoming(tcp.incoming(), false);
|
||||
}
|
||||
|
||||
tx.send((Arbiter::system(), local_addr)).unwrap();
|
||||
let _ = sys.run();
|
||||
});
|
||||
|
||||
let system = System::new("actix-test");
|
||||
let (server_sys, addr) = rx.recv().unwrap();
|
||||
TestServer {
|
||||
addr,
|
||||
server_sys,
|
||||
ssl,
|
||||
system,
|
||||
conn: TestServer::get_conn(),
|
||||
thread: Some(join),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Test application helper for testing request handlers.
|
||||
pub struct TestApp<S=()> {
|
||||
|
@@ -209,6 +209,15 @@ impl Client {
|
||||
self
|
||||
}
|
||||
|
||||
/// Set websocket handshake timeout
|
||||
///
|
||||
/// Handshake timeout is a total time for successful handshake.
|
||||
/// Default value is 5 seconds.
|
||||
pub fn timeout(mut self, timeout: Duration) -> Self {
|
||||
self.request.timeout(timeout);
|
||||
self
|
||||
}
|
||||
|
||||
/// Connect to websocket server and do ws handshake
|
||||
pub fn connect(&mut self) -> ClientHandshake {
|
||||
if let Some(e) = self.err.take() {
|
||||
@@ -445,16 +454,14 @@ impl Stream for ClientReader {
|
||||
// read
|
||||
match Frame::parse(&mut inner.rx, false, max_size) {
|
||||
Ok(Async::Ready(Some(frame))) => {
|
||||
let (finished, opcode, payload) = frame.unpack();
|
||||
|
||||
// continuation is not supported
|
||||
if !finished {
|
||||
inner.closed = true;
|
||||
return Err(ProtocolError::NoContinuation)
|
||||
}
|
||||
let (_finished, opcode, payload) = frame.unpack();
|
||||
|
||||
match opcode {
|
||||
OpCode::Continue => unimplemented!(),
|
||||
// continuation is not supported
|
||||
OpCode::Continue => {
|
||||
inner.closed = true;
|
||||
Err(ProtocolError::NoContinuation)
|
||||
},
|
||||
OpCode::Bad => {
|
||||
inner.closed = true;
|
||||
Err(ProtocolError::BadOpCode)
|
||||
|
@@ -205,7 +205,7 @@ impl<A, S> ActorHttpContext for WebsocketContext<A, S> where A: Actor<Context=Se
|
||||
};
|
||||
|
||||
if self.inner.alive() && self.inner.poll(ctx).is_err() {
|
||||
return Err(ErrorInternalServerError("error").into())
|
||||
return Err(ErrorInternalServerError("error"))
|
||||
}
|
||||
|
||||
// frames
|
||||
|
31
tests/cert.pem
Normal file
31
tests/cert.pem
Normal file
@@ -0,0 +1,31 @@
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIIFPjCCAyYCCQDvLYiYD+jqeTANBgkqhkiG9w0BAQsFADBhMQswCQYDVQQGEwJV
|
||||
UzELMAkGA1UECAwCQ0ExCzAJBgNVBAcMAlNGMRAwDgYDVQQKDAdDb21wYW55MQww
|
||||
CgYDVQQLDANPcmcxGDAWBgNVBAMMD3d3dy5leGFtcGxlLmNvbTAeFw0xODAxMjUx
|
||||
NzQ2MDFaFw0xOTAxMjUxNzQ2MDFaMGExCzAJBgNVBAYTAlVTMQswCQYDVQQIDAJD
|
||||
QTELMAkGA1UEBwwCU0YxEDAOBgNVBAoMB0NvbXBhbnkxDDAKBgNVBAsMA09yZzEY
|
||||
MBYGA1UEAwwPd3d3LmV4YW1wbGUuY29tMIICIjANBgkqhkiG9w0BAQEFAAOCAg8A
|
||||
MIICCgKCAgEA2WzIA2IpVR9Tb9EFhITlxuhE5rY2a3S6qzYNzQVgSFggxXEPn8k1
|
||||
sQEcer5BfAP986Sck3H0FvB4Bt/I8PwOtUCmhwcc8KtB5TcGPR4fjXnrpC+MIK5U
|
||||
NLkwuyBDKziYzTdBj8kUFX1WxmvEHEgqToPOZfBgsS71cJAR/zOWraDLSRM54jXy
|
||||
voLZN4Ti9rQagQrvTQ44Vz5ycDQy7UxtbUGh1CVv69vNVr7/SOOh/Nw5FNOZWLWr
|
||||
odGyoec5wh9iqRZgRqiTUc6Lt7V2RWc2X2gjwST2UfI+U46Ip3oaQ7ZD4eAkoqND
|
||||
xdniBZAykVG3c/99ux4BAESTF8fsNch6UticBxYMuTu+ouvP0psfI9wwwNliJDmA
|
||||
CRUTB9AgRynbL1AzhqQoDfsb98IZfjfNOpwnwuLwpMAPhbgd5KNdZaIJ4Hb6/stI
|
||||
yFElOExxd3TAxF2Gshd/lq1JcNHAZ1DSXV5MvOWT/NWgXwbIzUgQ8eIi+HuDYX2U
|
||||
UuaB6R8tbd52H7rbUv6HrfinuSlKWqjSYLkiKHkwUpoMw8y9UycRSzs1E9nPwPTO
|
||||
vRXb0mNCQeBCV9FvStNVXdCUTT8LGPv87xSD2pmt7LijlE6mHLG8McfcWkzA69un
|
||||
CEHIFAFDimTuN7EBljc119xWFTcHMyoZAfFF+oTqwSbBGImruCxnaJECAwEAATAN
|
||||
BgkqhkiG9w0BAQsFAAOCAgEApavsgsn7SpPHfhDSN5iZs1ILZQRewJg0Bty0xPfk
|
||||
3tynSW6bNH3nSaKbpsdmxxomthNSQgD2heOq1By9YzeOoNR+7Pk3s4FkASnf3ToI
|
||||
JNTUasBFFfaCG96s4Yvs8KiWS/k84yaWuU8c3Wb1jXs5Rv1qE1Uvuwat1DSGXSoD
|
||||
JNluuIkCsC4kWkyq5pWCGQrabWPRTWsHwC3PTcwSRBaFgYLJaR72SloHB1ot02zL
|
||||
d2age9dmFRFLLCBzP+D7RojBvL37qS/HR+rQ4SoQwiVc/JzaeqSe7ZbvEH9sZYEu
|
||||
ALowJzgbwro7oZflwTWunSeSGDSltkqKjvWvZI61pwfHKDahUTmZ5h2y67FuGEaC
|
||||
CIOUI8dSVSPKITxaq3JL4ze2e9/0Lt7hj19YK2uUmtMAW5Tirz4Yx5lyGH9U8Wur
|
||||
y/X8VPxTc4A9TMlJgkyz0hqvhbPOT/zSWB10zXh0glKAsSBryAOEDxV1UygmSir7
|
||||
YV8Qaq+oyKUTMc1MFq5vZ07M51EPaietn85t8V2Y+k/8XYltRp32NxsypxAJuyxh
|
||||
g/ko6RVTrWa1sMvz/F9LFqAdKiK5eM96lh9IU4xiLg4ob8aS/GRAA8oIFkZFhLrt
|
||||
tOwjIUPmEPyHWFi8dLpNuQKYalLYhuwZftG/9xV+wqhKGZO9iPrpHSYBRTap8w2y
|
||||
1QU=
|
||||
-----END CERTIFICATE-----
|
51
tests/key.pem
Normal file
51
tests/key.pem
Normal file
@@ -0,0 +1,51 @@
|
||||
-----BEGIN RSA PRIVATE KEY-----
|
||||
MIIJKAIBAAKCAgEA2WzIA2IpVR9Tb9EFhITlxuhE5rY2a3S6qzYNzQVgSFggxXEP
|
||||
n8k1sQEcer5BfAP986Sck3H0FvB4Bt/I8PwOtUCmhwcc8KtB5TcGPR4fjXnrpC+M
|
||||
IK5UNLkwuyBDKziYzTdBj8kUFX1WxmvEHEgqToPOZfBgsS71cJAR/zOWraDLSRM5
|
||||
4jXyvoLZN4Ti9rQagQrvTQ44Vz5ycDQy7UxtbUGh1CVv69vNVr7/SOOh/Nw5FNOZ
|
||||
WLWrodGyoec5wh9iqRZgRqiTUc6Lt7V2RWc2X2gjwST2UfI+U46Ip3oaQ7ZD4eAk
|
||||
oqNDxdniBZAykVG3c/99ux4BAESTF8fsNch6UticBxYMuTu+ouvP0psfI9wwwNli
|
||||
JDmACRUTB9AgRynbL1AzhqQoDfsb98IZfjfNOpwnwuLwpMAPhbgd5KNdZaIJ4Hb6
|
||||
/stIyFElOExxd3TAxF2Gshd/lq1JcNHAZ1DSXV5MvOWT/NWgXwbIzUgQ8eIi+HuD
|
||||
YX2UUuaB6R8tbd52H7rbUv6HrfinuSlKWqjSYLkiKHkwUpoMw8y9UycRSzs1E9nP
|
||||
wPTOvRXb0mNCQeBCV9FvStNVXdCUTT8LGPv87xSD2pmt7LijlE6mHLG8McfcWkzA
|
||||
69unCEHIFAFDimTuN7EBljc119xWFTcHMyoZAfFF+oTqwSbBGImruCxnaJECAwEA
|
||||
AQKCAgAME3aoeXNCPxMrSri7u4Xnnk71YXl0Tm9vwvjRQlMusXZggP8VKN/KjP0/
|
||||
9AE/GhmoxqPLrLCZ9ZE1EIjgmZ9Xgde9+C8rTtfCG2RFUL7/5J2p6NonlocmxoJm
|
||||
YkxYwjP6ce86RTjQWL3RF3s09u0inz9/efJk5O7M6bOWMQ9VZXDlBiRY5BYvbqUR
|
||||
6FeSzD4MnMbdyMRoVBeXE88gTvZk8xhB6DJnLzYgc0tKiRoeKT0iYv5JZw25VyRM
|
||||
ycLzfTrFmXCPfB1ylb483d9Ly4fBlM8nkx37PzEnAuukIawDxsPOb9yZC+hfvNJI
|
||||
7NFiMN+3maEqG2iC00w4Lep4skHY7eHUEUMl+Wjr+koAy2YGLWAwHZQTm7iXn9Ab
|
||||
L6adL53zyCKelRuEQOzbeosJAqS+5fpMK0ekXyoFIuskj7bWuIoCX7K/kg6q5IW+
|
||||
vC2FrlsrbQ79GztWLVmHFO1I4J9M5r666YS0qdh8c+2yyRl4FmSiHfGxb3eOKpxQ
|
||||
b6uI97iZlkxPF9LYUCSc7wq0V2gGz+6LnGvTHlHrOfVXqw/5pLAKhXqxvnroDTwz
|
||||
0Ay/xFF6ei/NSxBY5t8ztGCBm45wCU3l8pW0X6dXqwUipw5b4MRy1VFRu6rqlmbL
|
||||
OPSCuLxqyqsigiEYsBgS/icvXz9DWmCQMPd2XM9YhsHvUq+R4QKCAQEA98EuMMXI
|
||||
6UKIt1kK2t/3OeJRyDd4iv/fCMUAnuPjLBvFE4cXD/SbqCxcQYqb+pue3PYkiTIC
|
||||
71rN8OQAc5yKhzmmnCE5N26br/0pG4pwEjIr6mt8kZHmemOCNEzvhhT83nfKmV0g
|
||||
9lNtuGEQMiwmZrpUOF51JOMC39bzcVjYX2Cmvb7cFbIq3lR0zwM+aZpQ4P8LHCIu
|
||||
bgHmwbdlkLyIULJcQmHIbo6nPFB3ZZE4mqmjwY+rA6Fh9rgBa8OFCfTtrgeYXrNb
|
||||
IgZQ5U8GoYRPNC2ot0vpTinraboa/cgm6oG4M7FW1POCJTl+/ktHEnKuO5oroSga
|
||||
/BSg7hCNFVaOhwKCAQEA4Kkys0HtwEbV5mY/NnvUD5KwfXX7BxoXc9lZ6seVoLEc
|
||||
KjgPYxqYRVrC7dB2YDwwp3qcRTi/uBAgFNm3iYlDzI4xS5SeaudUWjglj7BSgXE2
|
||||
iOEa7EwcvVPluLaTgiWjlzUKeUCNNHWSeQOt+paBOT+IgwRVemGVpAgkqQzNh/nP
|
||||
tl3p9aNtgzEm1qVlPclY/XUCtf3bcOR+z1f1b4jBdn0leu5OhnxkC+Htik+2fTXD
|
||||
jt6JGrMkanN25YzsjnD3Sn+v6SO26H99wnYx5oMSdmb8SlWRrKtfJHnihphjG/YY
|
||||
l1cyorV6M/asSgXNQfGJm4OuJi0I4/FL2wLUHnU+JwKCAQEAzh4WipcRthYXXcoj
|
||||
gMKRkMOb3GFh1OpYqJgVExtudNTJmZxq8GhFU51MR27Eo7LycMwKy2UjEfTOnplh
|
||||
Us2qZiPtW7k8O8S2m6yXlYUQBeNdq9IuuYDTaYD94vsazscJNSAeGodjE+uGvb1q
|
||||
1wLqE87yoE7dUInYa1cOA3+xy2/CaNuviBFJHtzOrSb6tqqenQEyQf6h9/12+DTW
|
||||
t5pSIiixHrzxHiFqOoCLRKGToQB+71rSINwTf0nITNpGBWmSj5VcC3VV3TG5/XxI
|
||||
fPlxV2yhD5WFDPVNGBGvwPDSh4jSMZdZMSNBZCy4XWFNSKjGEWoK4DFYed3DoSt9
|
||||
5IG1YwKCAQA63ntHl64KJUWlkwNbboU583FF3uWBjee5VqoGKHhf3CkKMxhtGqnt
|
||||
+oN7t5VdUEhbinhqdx1dyPPvIsHCS3K1pkjqii4cyzNCVNYa2dQ00Qq+QWZBpwwc
|
||||
3GAkz8rFXsGIPMDa1vxpU6mnBjzPniKMcsZ9tmQDppCEpBGfLpio2eAA5IkK8eEf
|
||||
cIDB3CM0Vo94EvI76CJZabaE9IJ+0HIJb2+jz9BJ00yQBIqvJIYoNy9gP5Xjpi+T
|
||||
qV/tdMkD5jwWjHD3AYHLWKUGkNwwkAYFeqT/gX6jpWBP+ZRPOp011X3KInJFSpKU
|
||||
DT5GQ1Dux7EMTCwVGtXqjO8Ym5wjwwsfAoIBAEcxlhIW1G6BiNfnWbNPWBdh3v/K
|
||||
5Ln98Rcrz8UIbWyl7qNPjYb13C1KmifVG1Rym9vWMO3KuG5atK3Mz2yLVRtmWAVc
|
||||
fxzR57zz9MZFDun66xo+Z1wN3fVxQB4CYpOEI4Lb9ioX4v85hm3D6RpFukNtRQEc
|
||||
Gfr4scTjJX4jFWDp0h6ffMb8mY+quvZoJ0TJqV9L9Yj6Ksdvqez/bdSraev97bHQ
|
||||
4gbQxaTZ6WjaD4HjpPQefMdWp97Metg0ZQSS8b8EzmNFgyJ3XcjirzwliKTAQtn6
|
||||
I2sd0NCIooelrKRD8EJoDUwxoOctY7R97wpZ7/wEHU45cBCbRV3H4JILS5c=
|
||||
-----END RSA PRIVATE KEY-----
|
140
tests/test_ws.rs
140
tests/test_ws.rs
@@ -3,9 +3,14 @@ extern crate actix_web;
|
||||
extern crate futures;
|
||||
extern crate http;
|
||||
extern crate bytes;
|
||||
extern crate rand;
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::Stream;
|
||||
use rand::Rng;
|
||||
|
||||
#[cfg(feature="alpn")]
|
||||
extern crate openssl;
|
||||
|
||||
use actix_web::*;
|
||||
use actix::prelude::*;
|
||||
@@ -51,3 +56,138 @@ fn test_simple() {
|
||||
let (item, _) = srv.execute(reader.into_future()).unwrap();
|
||||
assert_eq!(item, Some(ws::Message::Close(ws::CloseCode::Normal)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_large_text() {
|
||||
let data = rand::thread_rng()
|
||||
.gen_ascii_chars()
|
||||
.take(65_536)
|
||||
.collect::<String>();
|
||||
|
||||
let mut srv = test::TestServer::new(
|
||||
|app| app.handler(|req| ws::start(req, Ws)));
|
||||
let (mut reader, mut writer) = srv.ws().unwrap();
|
||||
|
||||
for _ in 0..100 {
|
||||
writer.text(data.clone());
|
||||
let (item, r) = srv.execute(reader.into_future()).unwrap();
|
||||
reader = r;
|
||||
assert_eq!(item, Some(ws::Message::Text(data.clone())));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_large_bin() {
|
||||
let data = rand::thread_rng()
|
||||
.gen_ascii_chars()
|
||||
.take(65_536)
|
||||
.collect::<String>();
|
||||
|
||||
let mut srv = test::TestServer::new(
|
||||
|app| app.handler(|req| ws::start(req, Ws)));
|
||||
let (mut reader, mut writer) = srv.ws().unwrap();
|
||||
|
||||
for _ in 0..100 {
|
||||
writer.binary(data.clone());
|
||||
let (item, r) = srv.execute(reader.into_future()).unwrap();
|
||||
reader = r;
|
||||
assert_eq!(item, Some(ws::Message::Binary(Binary::from(data.clone()))));
|
||||
}
|
||||
}
|
||||
|
||||
struct Ws2 {
|
||||
count: usize,
|
||||
bin: bool,
|
||||
}
|
||||
|
||||
impl Actor for Ws2 {
|
||||
type Context = ws::WebsocketContext<Self>;
|
||||
|
||||
fn started(&mut self, ctx: &mut Self::Context) {
|
||||
self.send(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
impl Ws2 {
|
||||
fn send(&mut self, ctx: &mut ws::WebsocketContext<Self>) {
|
||||
if self.bin {
|
||||
ctx.binary(Vec::from("0".repeat(65_536)));
|
||||
} else {
|
||||
ctx.text("0".repeat(65_536));
|
||||
}
|
||||
ctx.drain().and_then(|_, act, ctx| {
|
||||
act.count += 1;
|
||||
if act.count != 10_000 {
|
||||
act.send(ctx);
|
||||
}
|
||||
actix::fut::ok(())
|
||||
}).wait(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
impl StreamHandler<ws::Message, ws::ProtocolError> for Ws2 {
|
||||
|
||||
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
|
||||
match msg {
|
||||
ws::Message::Ping(msg) => ctx.pong(&msg),
|
||||
ws::Message::Text(text) => ctx.text(text),
|
||||
ws::Message::Binary(bin) => ctx.binary(bin),
|
||||
ws::Message::Close(reason) => ctx.close(reason, ""),
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_server_send_text() {
|
||||
let data = Some(ws::Message::Text("0".repeat(65_536)));
|
||||
|
||||
let mut srv = test::TestServer::new(
|
||||
|app| app.handler(|req| ws::start(req, Ws2{count:0, bin: false})));
|
||||
let (mut reader, _writer) = srv.ws().unwrap();
|
||||
|
||||
for _ in 0..10_000 {
|
||||
let (item, r) = srv.execute(reader.into_future()).unwrap();
|
||||
reader = r;
|
||||
assert_eq!(item, data);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_server_send_bin() {
|
||||
let data = Some(ws::Message::Binary(Binary::from("0".repeat(65_536))));
|
||||
|
||||
let mut srv = test::TestServer::new(
|
||||
|app| app.handler(|req| ws::start(req, Ws2{count:0, bin: true})));
|
||||
let (mut reader, _writer) = srv.ws().unwrap();
|
||||
|
||||
for _ in 0..10_000 {
|
||||
let (item, r) = srv.execute(reader.into_future()).unwrap();
|
||||
reader = r;
|
||||
assert_eq!(item, data);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(feature="alpn")]
|
||||
fn test_ws_server_ssl() {
|
||||
extern crate openssl;
|
||||
use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype};
|
||||
|
||||
// load ssl keys
|
||||
let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
|
||||
builder.set_private_key_file("tests/key.pem", SslFiletype::PEM).unwrap();
|
||||
builder.set_certificate_chain_file("tests/cert.pem").unwrap();
|
||||
|
||||
let mut srv = test::TestServer::build()
|
||||
.ssl(builder.build())
|
||||
.start(|app| app.handler(|req| ws::start(req, Ws2{count:0, bin: false})));
|
||||
let (mut reader, _writer) = srv.ws().unwrap();
|
||||
|
||||
let data = Some(ws::Message::Text("0".repeat(65_536)));
|
||||
for _ in 0..10_000 {
|
||||
let (item, r) = srv.execute(reader.into_future()).unwrap();
|
||||
reader = r;
|
||||
assert_eq!(item, data);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user