1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-07-15 06:26:11 +02:00

Compare commits

...

27 Commits

Author SHA1 Message Date
2227120ae0 exclude examples 2018-01-21 09:09:19 -08:00
21c8c0371d travis config 2018-01-21 08:50:29 -08:00
1914a6a0d8 Always enable content encoding if encoding explicitly selected 2018-01-21 08:31:46 -08:00
1cff4619e7 reduce threshold for content encoding 2018-01-21 08:12:32 -08:00
7bb7adf89c relax InternalError constraints 2018-01-20 22:02:42 -08:00
f55ff24925 fix guide example 2018-01-20 21:40:18 -08:00
f5f78d79e6 update doc strings 2018-01-20 21:16:31 -08:00
9180625dfd refactor helper error types 2018-01-20 21:11:46 -08:00
552320bae2 add error logging guide section 2018-01-20 20:21:01 -08:00
7cf221f767 Log request processing errors 2018-01-20 20:12:24 -08:00
98931a8623 test case for broken transfer encoding 2018-01-20 16:51:18 -08:00
ae10a89014 use ws masking from tungstenite project 2018-01-20 16:47:34 -08:00
71d534dadb CORS middleware: allowed_headers is defaulting to None #50 2018-01-20 16:36:57 -08:00
867bb1d409 Merge branch 'master' of github.com:actix/actix-web 2018-01-20 16:12:51 -08:00
91c44a1cf1 Fix HEAD requests handling 2018-01-20 16:12:38 -08:00
3bc60a8d5d Merge pull request #53 from andreevlex/spelling-check-2
spelling check
2018-01-16 12:07:58 -08:00
58df8fa4b9 spelling check 2018-01-16 21:59:33 +03:00
81f92b43e5 Merge pull request #52 from andreevlex/spelling-check
spelling check
2018-01-15 14:16:54 -08:00
e1d9c3803b spelling check 2018-01-16 00:47:25 +03:00
a7c24aace1 flush is useless 2018-01-14 19:28:34 -08:00
89a89e7b18 refactor shared bytes api 2018-01-14 17:00:28 -08:00
3425f7be40 fix tests 2018-01-14 14:58:58 -08:00
09a6f8a34f disable alpn feature 2018-01-14 14:44:32 -08:00
7060f298b4 use more binary 2018-01-14 14:40:39 -08:00
33dbe15760 use Binary for writer trait 2018-01-14 13:50:38 -08:00
e95c7dfc29 use local actix-web for examples 2018-01-13 19:04:07 -08:00
927a92fcac impl HttpHandler for Box<HttpHandler> and add helper method Application::boxed() #49 2018-01-13 18:58:17 -08:00
45 changed files with 800 additions and 460 deletions

View File

@ -46,7 +46,9 @@ script:
cargo clean
USE_SKEPTIC=1 cargo test --features=alpn
else
cargo test --features=alpn
cargo clean
cargo test
# --features=alpn
fi
- |

View File

@ -1,5 +1,18 @@
# Changes
## 0.3.2 (2018-01-21)
* Fix HEAD requests handling
* Log request processing errors
* Always enable content encoding if encoding explicitly selected
* Allow multiple Applications on a single server with different state #49
* CORS middleware: allowed_headers is defaulting to None #50
## 0.3.1 (2018-01-13)
* Fix directory entry path #47

View File

@ -1,6 +1,6 @@
[package]
name = "actix-web"
version = "0.3.1"
version = "0.3.2"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix web framework"
readme = "README.md"
@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous",
"web-programming::http-server", "web-programming::websocket"]
license = "MIT/Apache-2.0"
exclude = [".gitignore", ".travis.yml", ".cargo/config",
"appveyor.yml", "./examples/static/*"]
"appveyor.yml", "/examples/**"]
build = "build.rs"
[badges]
@ -81,7 +81,7 @@ version = "0.9"
optional = true
[dev-dependencies]
env_logger = "0.4"
env_logger = "0.5"
reqwest = "0.8"
skeptic = "0.13"
serde_derive = "1.0"

View File

@ -6,6 +6,6 @@ workspace = "../.."
[dependencies]
futures = "*"
env_logger = "0.4"
env_logger = "0.5"
actix = "0.4"
actix-web = { path = "../../" }
actix-web = { path="../.." }

View File

@ -7,6 +7,7 @@ extern crate env_logger;
extern crate futures;
use futures::Stream;
use std::{io, env};
use actix_web::*;
use actix_web::middleware::RequestSession;
use futures::future::{FutureResult, result};
@ -56,17 +57,17 @@ fn index(mut req: HttpRequest) -> Result<HttpResponse> {
fn p404(req: HttpRequest) -> Result<HttpResponse> {
// html
let html = format!(r#"<!DOCTYPE html><html><head><title>actix - basics</title><link rel="shortcut icon" type="image/x-icon" href="/favicon.ico" /></head>
let html = r#"<!DOCTYPE html><html><head><title>actix - basics</title><link rel="shortcut icon" type="image/x-icon" href="/favicon.ico" /></head>
<body>
<a href="index.html">back to home</a>
<h1>404</h1>
</body>
</html>"#);
</html>"#;
// response
Ok(HttpResponse::build(StatusCode::NOT_FOUND)
.content_type("text/html; charset=utf-8")
.body(&html).unwrap())
.body(html).unwrap())
}
@ -92,8 +93,9 @@ fn with_param(req: HttpRequest) -> Result<HttpResponse>
}
fn main() {
::std::env::set_var("RUST_LOG", "actix_web=info");
let _ = env_logger::init();
env::set_var("RUST_LOG", "actix_web=debug");
env::set_var("RUST_BACKTRACE", "1");
env_logger::init();
let sys = actix::System::new("basic-example");
let addr = HttpServer::new(
@ -121,6 +123,9 @@ fn main() {
_ => httpcodes::HTTPNotFound,
}
}))
.resource("/error.html", |r| r.f(|req| {
error::ErrorBadRequest(io::Error::new(io::ErrorKind::Other, "test"))
}))
// static files
.handler("/static/", fs::StaticFiles::new("../static/", true))
// redirect

View File

@ -5,9 +5,9 @@ authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../.."
[dependencies]
env_logger = "0.4"
env_logger = "0.5"
actix = "0.4"
actix-web = { git = "https://github.com/actix/actix-web" }
actix-web = { path = "../../" }
futures = "0.1"
uuid = { version = "0.5", features = ["serde", "v4"] }

View File

@ -8,7 +8,7 @@ use diesel::prelude::*;
use models;
use schema;
/// This is db executor actor. We are going to run 3 of them in parallele.
/// This is db executor actor. We are going to run 3 of them in parallel.
pub struct DbExecutor(pub SqliteConnection);
/// This is only message that this actor can handle, but it is easy to extend number of

View File

@ -5,6 +5,6 @@ authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../.."
[dependencies]
env_logger = "0.4"
env_logger = "0.5"
actix = "0.4"
actix-web = { path = "../../" }

View File

@ -15,4 +15,4 @@ serde_derive = "1.0"
json = "*"
actix = "0.4"
actix-web = { git = "https://github.com/actix/actix-web" }
actix-web = { path="../../" }

View File

@ -12,4 +12,4 @@ path = "src/main.rs"
env_logger = "*"
futures = "0.1"
actix = "0.4"
actix-web = { git = "https://github.com/actix/actix-web" }
actix-web = { path="../../" }

View File

@ -6,6 +6,6 @@ workspace = "../.."
[dependencies]
futures = "*"
env_logger = "0.4"
env_logger = "0.5"
actix = "0.4"
actix-web = { git = "https://github.com/actix/actix-web" }
actix-web = { path = "../../" }

View File

@ -1,5 +1,5 @@
#![cfg_attr(feature="cargo-clippy", allow(needless_pass_by_value))]
//! There are two level of statfulness in actix-web. Application has state
//! There are two level of statefulness in actix-web. Application has state
//! that is shared across all handlers within same Application.
//! And individual handler can have state.
@ -33,7 +33,7 @@ struct MyWebSocket {
}
impl Actor for MyWebSocket {
type Context = HttpContext<Self, AppState>;
type Context = ws::WebsocketContext<Self, AppState>;
}
impl Handler<ws::Message> for MyWebSocket {
@ -43,9 +43,9 @@ impl Handler<ws::Message> for MyWebSocket {
self.counter += 1;
println!("WS({}): {:?}", self.counter, msg);
match msg {
ws::Message::Ping(msg) => ws::WsWriter::pong(ctx, &msg),
ws::Message::Text(text) => ws::WsWriter::text(ctx, &text),
ws::Message::Binary(bin) => ws::WsWriter::binary(ctx, bin),
ws::Message::Ping(msg) => ctx.pong(&msg),
ws::Message::Text(text) => ctx.text(&text),
ws::Message::Binary(bin) => ctx.binary(bin),
ws::Message::Closed | ws::Message::Error => {
ctx.stop();
}

View File

@ -5,7 +5,7 @@ authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../.."
[dependencies]
env_logger = "0.4"
env_logger = "0.5"
actix = "0.4"
actix-web = { git = "https://github.com/actix/actix-web" }
actix-web = { path = "../../" }
tera = "*"

View File

@ -9,6 +9,6 @@ name = "server"
path = "src/main.rs"
[dependencies]
env_logger = "0.4"
env_logger = "0.5"
actix = "^0.4.2"
actix-web = { git = "https://github.com/actix/actix-web", features=["alpn"] }
actix-web = { path = "../../", features=["alpn"] }

View File

@ -9,7 +9,7 @@ use std::io::Read;
use actix_web::*;
/// somple handle
/// simple handle
fn index(req: HttpRequest) -> Result<HttpResponse> {
println!("{:?}", req);
Ok(httpcodes::HTTPOk

View File

@ -26,4 +26,4 @@ serde_json = "1.0"
serde_derive = "1.0"
actix = "^0.4.2"
actix-web = { git = "https://github.com/actix/actix-web" }
actix-web = { path="../../" }

View File

@ -16,8 +16,8 @@ Chat server listens for incoming tcp connections. Server can access several type
* `\list` - list all available rooms
* `\join name` - join room, if room does not exist, create new one
* `\name name` - set session name
* `some message` - just string, send messsage to all peers in same room
* client has to send heartbeat `Ping` messages, if server does not receive a heartbeat message for 10 seconds connection gets droppped
* `some message` - just string, send message to all peers in same room
* client has to send heartbeat `Ping` messages, if server does not receive a heartbeat message for 10 seconds connection gets dropped
To start server use command: `cargo run --bin server`

View File

@ -16,7 +16,7 @@ use codec::{ChatRequest, ChatResponse, ChatCodec};
#[derive(Message)]
pub struct Message(pub String);
/// `ChatSession` actor is responsible for tcp peer communitions.
/// `ChatSession` actor is responsible for tcp peer communications.
pub struct ChatSession {
/// unique session id
id: usize,
@ -30,7 +30,7 @@ pub struct ChatSession {
impl Actor for ChatSession {
/// For tcp communication we are going to use `FramedContext`.
/// It is convinient wrapper around `Framed` object from `tokio_io`
/// It is convenient wrapper around `Framed` object from `tokio_io`
type Context = FramedContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
@ -149,7 +149,7 @@ impl ChatSession {
}
/// Define tcp server that will accept incomint tcp connection and create
/// Define tcp server that will accept incoming tcp connection and create
/// chat actors.
pub struct TcpServer {
chat: SyncAddress<ChatServer>,

View File

@ -134,3 +134,18 @@ fn index(req: HttpRequest) -> Result<&'static str> {
```
In this example *BAD REQUEST* response get generated for `MyError` error.
## Error logging
Actix logs all errors with `WARN` log level. If log level set to `DEBUG`
and `RUST_BACKTRACE` is enabled, backtrace get logged. The Error type uses
cause's error backtrace if available, if the underlying failure does not provide
a backtrace, a new backtrace is constructed pointing to that conversion point
(rather than the origin of the error). This construction only happens if there
is no underlying backtrace; if it does have a backtrace no new backtrace is constructed.
You can enable backtrace and debug logging with following command:
```
>> RUST_BACKTRACE=1 RUST_LOG=actix_web=debug cargo run
```

View File

@ -59,9 +59,11 @@ impl<S: 'static> PipelineHandler<S> for Inner<S> {
#[cfg(test)]
impl<S: 'static> HttpApplication<S> {
#[cfg(test)]
pub(crate) fn run(&mut self, req: HttpRequest<S>) -> Reply {
self.inner.borrow_mut().handle(req)
}
#[cfg(test)]
pub(crate) fn prepare_request(&self, req: HttpRequest) -> HttpRequest<S> {
req.with_state(Rc::clone(&self.state), self.router.clone())
}
@ -134,7 +136,7 @@ impl<S> Application<S> where S: 'static {
/// Create application with specific state. Application can be
/// configured with builder-like pattern.
///
/// State is shared with all reousrces within same application and could be
/// State is shared with all resources within same application and could be
/// accessed with `HttpRequest::state()` method.
pub fn with_state(state: S) -> Application<S> {
Application {
@ -154,7 +156,7 @@ impl<S> Application<S> where S: 'static {
/// Set application prefix
///
/// Only requests that matches application's prefix get processed by this application.
/// Application prefix always contains laading "/" slash. If supplied prefix
/// Application prefix always contains leading "/" slash. If supplied prefix
/// does not contain leading slash, it get inserted. Prefix should
/// consists valid path segments. i.e for application with
/// prefix `/app` any request with following paths `/app`, `/app/` or `/app/test`
@ -356,6 +358,40 @@ impl<S> Application<S> where S: 'static {
middlewares: Rc::new(parts.middlewares),
}
}
/// Convenience method for creating `Box<HttpHandler>` instance.
///
/// This method is useful if you need to register several application instances
/// with different state.
///
/// ```rust
/// # use std::thread;
/// # extern crate actix_web;
/// use actix_web::*;
///
/// struct State1;
///
/// struct State2;
///
/// fn main() {
/// # thread::spawn(|| {
/// HttpServer::new(|| { vec![
/// Application::with_state(State1)
/// .prefix("/app1")
/// .resource("/", |r| r.h(httpcodes::HTTPOk))
/// .boxed(),
/// Application::with_state(State2)
/// .prefix("/app2")
/// .resource("/", |r| r.h(httpcodes::HTTPOk))
/// .boxed() ]})
/// .bind("127.0.0.1:8080").unwrap()
/// .run()
/// # });
/// }
/// ```
pub fn boxed(mut self) -> Box<HttpHandler> {
Box::new(self.finish())
}
}
impl<S: 'static> IntoHttpHandler for Application<S> {

View File

@ -1,4 +1,4 @@
use std::fmt;
use std::{fmt, mem};
use std::rc::Rc;
use std::sync::Arc;
use bytes::{Bytes, BytesMut};
@ -31,7 +31,7 @@ pub enum Binary {
Bytes(Bytes),
/// Static slice
Slice(&'static [u8]),
/// Shared stirng body
/// Shared string body
SharedString(Rc<String>),
/// Shared string body
#[doc(hidden)]
@ -122,6 +122,22 @@ impl Binary {
pub fn from_slice(s: &[u8]) -> Binary {
Binary::Bytes(Bytes::from(s))
}
/// Convert Binary to a Bytes instance
pub fn take(&mut self) -> Bytes {
mem::replace(self, Binary::Slice(b"")).into()
}
}
impl Clone for Binary {
fn clone(&self) -> Binary {
match *self {
Binary::Bytes(ref bytes) => Binary::Bytes(bytes.clone()),
Binary::Slice(slice) => Binary::Bytes(Bytes::from(slice)),
Binary::SharedString(ref s) => Binary::Bytes(Bytes::from(s.as_str())),
Binary::ArcSharedString(ref s) => Binary::Bytes(Bytes::from(s.as_str())),
}
}
}
impl Into<Bytes> for Binary {

View File

@ -127,7 +127,7 @@ impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
}
}
/// Indicate end of streamimng payload. Also this method calls `Self::close`.
/// Indicate end of streaming payload. Also this method calls `Self::close`.
#[inline]
pub fn write_eof(&mut self) {
self.add_frame(Frame::Chunk(None));

View File

@ -9,8 +9,8 @@ use std::error::Error as StdError;
use cookie;
use httparse;
use failure::Fail;
use futures::Canceled;
use failure::{Fail, Backtrace};
use http2::Error as Http2Error;
use http::{header, StatusCode, Error as HttpError};
use http::uri::InvalidUriBytes;
@ -22,6 +22,8 @@ use url::ParseError as UrlParseError;
pub use cookie::{ParseError as CookieParseError};
use body::Body;
use handler::Responder;
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
use httpcodes::{HTTPBadRequest, HTTPMethodNotAllowed, HTTPExpectationFailed};
@ -33,9 +35,9 @@ use httpcodes::{HTTPBadRequest, HTTPMethodNotAllowed, HTTPExpectationFailed};
pub type Result<T, E=Error> = result::Result<T, E>;
/// General purpose actix web error
#[derive(Fail, Debug)]
pub struct Error {
cause: Box<ResponseError>,
backtrace: Option<Backtrace>,
}
impl Error {
@ -64,6 +66,16 @@ impl fmt::Display for Error {
}
}
impl fmt::Debug for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if let Some(bt) = self.cause.backtrace() {
write!(f, "{:?}\n\n{:?}", &self.cause, bt)
} else {
write!(f, "{:?}\n\n{:?}", &self.cause, self.backtrace.as_ref().unwrap())
}
}
}
/// `HttpResponse` for `Error`
impl From<Error> for HttpResponse {
fn from(err: Error) -> Self {
@ -74,7 +86,12 @@ impl From<Error> for HttpResponse {
/// `Error` for any error that implements `ResponseError`
impl<T: ResponseError> From<T> for Error {
fn from(err: T) -> Error {
Error { cause: Box::new(err) }
let backtrace = if err.backtrace().is_none() {
Some(Backtrace::new())
} else {
None
};
Error { cause: Box::new(err), backtrace: backtrace }
}
}
@ -320,7 +337,7 @@ pub enum WsHandshakeError {
/// Only get method is allowed
#[fail(display="Method not allowed")]
GetMethodRequired,
/// Ugrade header if not set to websocket
/// Upgrade header if not set to websocket
#[fail(display="Websocket upgrade is expected")]
NoWebsocketUpgrade,
/// Connection header is not set to upgrade
@ -329,7 +346,7 @@ pub enum WsHandshakeError {
/// Websocket version header is not set
#[fail(display="Websocket version header is required")]
NoVersionHeader,
/// Unsupported websockt version
/// Unsupported websocket version
#[fail(display="Unsupported version")]
UnsupportedVersion,
/// Websocket key is not set or wrong
@ -478,39 +495,10 @@ impl From<UrlParseError> for UrlGenerationError {
}
}
macro_rules! ERROR_WRAP {
($type:ty, $status:expr) => {
unsafe impl<T> Sync for $type {}
unsafe impl<T> Send for $type {}
impl<T> $type {
pub fn cause(&self) -> &T {
&self.0
}
}
impl<T: fmt::Debug + 'static> Fail for $type {}
impl<T: fmt::Debug + 'static> fmt::Display for $type {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self.0)
}
}
impl<T> ResponseError for $type
where T: Send + Sync + fmt::Debug + 'static,
{
fn error_response(&self) -> HttpResponse {
HttpResponse::new($status, Body::Empty)
}
}
}
}
/// Helper type that can wrap any error and generate *BAD REQUEST* response.
/// Helper type that can wrap any error and generate custom response.
///
/// In following example any `io::Error` will be converted into "BAD REQUEST" response
/// as oposite to *INNTERNAL SERVER ERROR* which is defined by default.
/// as opposite to *INNTERNAL SERVER ERROR* which is defined by default.
///
/// ```rust
/// # extern crate actix_web;
@ -523,59 +511,133 @@ macro_rules! ERROR_WRAP {
/// }
/// # fn main() {}
/// ```
#[derive(Debug)]
pub struct ErrorBadRequest<T>(pub T);
ERROR_WRAP!(ErrorBadRequest<T>, StatusCode::BAD_REQUEST);
pub struct InternalError<T> {
cause: T,
status: StatusCode,
backtrace: Backtrace,
}
#[derive(Debug)]
/// Helper type that can wrap any error and generate *UNAUTHORIZED* response.
pub struct ErrorUnauthorized<T>(pub T);
ERROR_WRAP!(ErrorUnauthorized<T>, StatusCode::UNAUTHORIZED);
unsafe impl<T> Sync for InternalError<T> {}
unsafe impl<T> Send for InternalError<T> {}
#[derive(Debug)]
/// Helper type that can wrap any error and generate *FORBIDDEN* response.
pub struct ErrorForbidden<T>(pub T);
ERROR_WRAP!(ErrorForbidden<T>, StatusCode::FORBIDDEN);
impl<T> InternalError<T> {
pub fn new(err: T, status: StatusCode) -> Self {
InternalError {
cause: err,
status: status,
backtrace: Backtrace::new(),
}
}
}
#[derive(Debug)]
/// Helper type that can wrap any error and generate *NOT FOUND* response.
pub struct ErrorNotFound<T>(pub T);
ERROR_WRAP!(ErrorNotFound<T>, StatusCode::NOT_FOUND);
impl<T> Fail for InternalError<T>
where T: Send + Sync + fmt::Debug + 'static
{
fn backtrace(&self) -> Option<&Backtrace> {
Some(&self.backtrace)
}
}
#[derive(Debug)]
/// Helper type that can wrap any error and generate *METHOD NOT ALLOWED* response.
pub struct ErrorMethodNotAllowed<T>(pub T);
ERROR_WRAP!(ErrorMethodNotAllowed<T>, StatusCode::METHOD_NOT_ALLOWED);
impl<T> fmt::Debug for InternalError<T>
where T: Send + Sync + fmt::Debug + 'static
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.cause, f)
}
}
#[derive(Debug)]
/// Helper type that can wrap any error and generate *REQUEST TIMEOUT* response.
pub struct ErrorRequestTimeout<T>(pub T);
ERROR_WRAP!(ErrorRequestTimeout<T>, StatusCode::REQUEST_TIMEOUT);
impl<T> fmt::Display for InternalError<T>
where T: Send + Sync + fmt::Debug + 'static
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.cause, f)
}
}
#[derive(Debug)]
/// Helper type that can wrap any error and generate *CONFLICT* response.
pub struct ErrorConflict<T>(pub T);
ERROR_WRAP!(ErrorConflict<T>, StatusCode::CONFLICT);
impl<T> ResponseError for InternalError<T>
where T: Send + Sync + fmt::Debug + 'static
{
fn error_response(&self) -> HttpResponse {
HttpResponse::new(self.status, Body::Empty)
}
}
#[derive(Debug)]
/// Helper type that can wrap any error and generate *GONE* response.
pub struct ErrorGone<T>(pub T);
ERROR_WRAP!(ErrorGone<T>, StatusCode::GONE);
impl<T> Responder for InternalError<T>
where T: Send + Sync + fmt::Debug + 'static
{
type Item = HttpResponse;
type Error = Error;
#[derive(Debug)]
/// Helper type that can wrap any error and generate *PRECONDITION FAILED* response.
pub struct ErrorPreconditionFailed<T>(pub T);
ERROR_WRAP!(ErrorPreconditionFailed<T>, StatusCode::PRECONDITION_FAILED);
fn respond_to(self, _: HttpRequest) -> Result<HttpResponse, Error> {
Err(self.into())
}
}
#[derive(Debug)]
/// Helper type that can wrap any error and generate *EXPECTATION FAILED* response.
pub struct ErrorExpectationFailed<T>(pub T);
ERROR_WRAP!(ErrorExpectationFailed<T>, StatusCode::EXPECTATION_FAILED);
/// Helper function that creates wrapper of any error and generate *BAD REQUEST* response.
#[allow(non_snake_case)]
pub fn ErrorBadRequest<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::BAD_REQUEST)
}
#[derive(Debug)]
/// Helper type that can wrap any error and generate *INTERNAL SERVER ERROR* response.
pub struct ErrorInternalServerError<T>(pub T);
ERROR_WRAP!(ErrorInternalServerError<T>, StatusCode::INTERNAL_SERVER_ERROR);
/// Helper function that creates wrapper of any error and generate *UNAUTHORIZED* response.
#[allow(non_snake_case)]
pub fn ErrorUnauthorized<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::UNAUTHORIZED)
}
/// Helper function that creates wrapper of any error and generate *FORBIDDEN* response.
#[allow(non_snake_case)]
pub fn ErrorForbidden<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::FORBIDDEN)
}
/// Helper function that creates wrapper of any error and generate *NOT FOUND* response.
#[allow(non_snake_case)]
pub fn ErrorNotFound<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::NOT_FOUND)
}
/// Helper function that creates wrapper of any error and generate *METHOD NOT ALLOWED* response.
#[allow(non_snake_case)]
pub fn ErrorMethodNotAllowed<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::METHOD_NOT_ALLOWED)
}
/// Helper function that creates wrapper of any error and generate *REQUEST TIMEOUT* response.
#[allow(non_snake_case)]
pub fn ErrorRequestTimeout<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::REQUEST_TIMEOUT)
}
/// Helper function that creates wrapper of any error and generate *CONFLICT* response.
#[allow(non_snake_case)]
pub fn ErrorConflict<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::CONFLICT)
}
/// Helper function that creates wrapper of any error and generate *GONE* response.
#[allow(non_snake_case)]
pub fn ErrorGone<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::GONE)
}
/// Helper function that creates wrapper of any error and generate *PRECONDITION FAILED* response.
#[allow(non_snake_case)]
pub fn ErrorPreconditionFailed<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::PRECONDITION_FAILED)
}
/// Helper function that creates wrapper of any error and generate *EXPECTATION FAILED* response.
#[allow(non_snake_case)]
pub fn ErrorExpectationFailed<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::EXPECTATION_FAILED)
}
/// Helper function that creates wrapper of any error and generate *INTERNAL SERVER ERROR* response.
#[allow(non_snake_case)]
pub fn ErrorInternalServerError<T>(err: T) -> InternalError<T> {
InternalError::new(err, StatusCode::INTERNAL_SERVER_ERROR)
}
#[cfg(test)]
mod tests {

View File

@ -9,7 +9,7 @@ use error::Error;
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
/// Trait defines object that could be regestered as route handler
/// Trait defines object that could be registered as route handler
#[allow(unused_variables)]
pub trait Handler<S>: 'static {
@ -35,7 +35,7 @@ pub trait Responder {
}
#[doc(hidden)]
/// Convinience trait that convert `Future` object into `Boxed` future
/// Convenience trait that convert `Future` object into `Boxed` future
pub trait AsyncResponder<I, E>: Sized {
fn responder(self) -> Box<Future<Item=I, Error=E>>;
}
@ -193,7 +193,7 @@ impl<I, E> Responder for Box<Future<Item=I, Error=E>>
}
}
/// Trait defines object that could be regestered as resource route
/// Trait defines object that could be registered as resource route
pub(crate) trait RouteHandler<S>: 'static {
fn handle(&mut self, req: HttpRequest<S>) -> Reply;
}
@ -341,7 +341,7 @@ impl Default for NormalizePath {
}
impl NormalizePath {
/// Create new `NoramlizePath` instance
/// Create new `NormalizePath` instance
pub fn new(append: bool, merge: bool, redirect: StatusCode) -> NormalizePath {
NormalizePath {
append: append,

View File

@ -66,84 +66,6 @@ impl fmt::Write for CachedDate {
}
}
/// Internal use only! unsafe
#[derive(Debug)]
pub(crate) struct SharedBytesPool(RefCell<VecDeque<Rc<BytesMut>>>);
impl SharedBytesPool {
pub fn new() -> SharedBytesPool {
SharedBytesPool(RefCell::new(VecDeque::with_capacity(128)))
}
pub fn get_bytes(&self) -> Rc<BytesMut> {
if let Some(bytes) = self.0.borrow_mut().pop_front() {
bytes
} else {
Rc::new(BytesMut::new())
}
}
pub fn release_bytes(&self, mut bytes: Rc<BytesMut>) {
let v = &mut self.0.borrow_mut();
if v.len() < 128 {
Rc::get_mut(&mut bytes).unwrap().take();
v.push_front(bytes);
}
}
}
#[derive(Debug)]
pub(crate) struct SharedBytes(
Option<Rc<BytesMut>>, Option<Rc<SharedBytesPool>>);
impl Drop for SharedBytes {
fn drop(&mut self) {
if let Some(ref pool) = self.1 {
if let Some(bytes) = self.0.take() {
if Rc::strong_count(&bytes) == 1 {
pool.release_bytes(bytes);
}
}
}
}
}
impl SharedBytes {
pub fn empty() -> Self {
SharedBytes(None, None)
}
pub fn new(bytes: Rc<BytesMut>, pool: Rc<SharedBytesPool>) -> SharedBytes {
SharedBytes(Some(bytes), Some(pool))
}
#[inline(always)]
#[allow(mutable_transmutes)]
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))]
pub fn get_mut(&self) -> &mut BytesMut {
let r: &BytesMut = self.0.as_ref().unwrap().as_ref();
unsafe{mem::transmute(r)}
}
#[inline]
pub fn get_ref(&self) -> &BytesMut {
self.0.as_ref().unwrap()
}
}
impl Default for SharedBytes {
fn default() -> Self {
SharedBytes(Some(Rc::new(BytesMut::new())), None)
}
}
impl Clone for SharedBytes {
fn clone(&self) -> SharedBytes {
SharedBytes(self.0.clone(), self.1.clone())
}
}
/// Internal use only! unsafe
pub(crate) struct SharedMessagePool(RefCell<VecDeque<Rc<HttpMessage>>>);

View File

@ -222,7 +222,7 @@ impl<S> HttpRequest<S> {
self.uri().path()
}
/// Get *ConnectionInfo* for currect request.
/// Get *ConnectionInfo* for correct request.
pub fn connection_info(&self) -> &ConnectionInfo {
if self.as_ref().info.is_none() {
let info: ConnectionInfo<'static> = unsafe{
@ -278,7 +278,7 @@ impl<S> HttpRequest<S> {
/// Peer socket address
///
/// Peer address is actuall socket address, if proxy is used in front of
/// Peer address is actual socket address, if proxy is used in front of
/// actix http server, then peer address would be address of this proxy.
///
/// To get client connection information `connection_info()` method should be used.

View File

@ -164,8 +164,8 @@ impl HttpResponse {
/// Content encoding
#[inline]
pub fn content_encoding(&self) -> &ContentEncoding {
&self.get_ref().encoding
pub fn content_encoding(&self) -> ContentEncoding {
self.get_ref().encoding
}
/// Set content encoding
@ -423,8 +423,8 @@ impl HttpResponseBuilder {
}
/// This method calls provided closure with builder reference if value is Some.
pub fn if_some<T, F>(&mut self, value: Option<&T>, f: F) -> &mut Self
where F: FnOnce(&T, &mut HttpResponseBuilder)
pub fn if_some<T, F>(&mut self, value: Option<T>, f: F) -> &mut Self
where F: FnOnce(T, &mut HttpResponseBuilder)
{
if let Some(val) = value {
f(val, self);
@ -812,11 +812,11 @@ mod tests {
#[test]
fn test_content_encoding() {
let resp = HttpResponse::build(StatusCode::OK).finish().unwrap();
assert_eq!(*resp.content_encoding(), ContentEncoding::Auto);
assert_eq!(resp.content_encoding(), ContentEncoding::Auto);
let resp = HttpResponse::build(StatusCode::OK)
.content_encoding(ContentEncoding::Br).finish().unwrap();
assert_eq!(*resp.content_encoding(), ContentEncoding::Br);
assert_eq!(resp.content_encoding(), ContentEncoding::Br);
}
#[test]

View File

@ -214,7 +214,7 @@ impl Cors {
/// This method register cors middleware with resource and
/// adds route for *OPTIONS* preflight requests.
///
/// It is possible to register *Cors* middlware with `Resource::middleware()`
/// It is possible to register *Cors* middleware with `Resource::middleware()`
/// method, but in that case *Cors* middleware wont be able to handle *OPTIONS*
/// requests.
pub fn register<S: 'static>(self, resource: &mut Resource<S>) {
@ -295,16 +295,23 @@ impl<S> Middleware<S> for Cors {
self.validate_allowed_method(req)?;
self.validate_allowed_headers(req)?;
// allowed headers
let headers = if let Some(headers) = self.headers.as_ref() {
Some(HeaderValue::try_from(&headers.iter().fold(
String::new(), |s, v| s + "," + v.as_str()).as_str()[1..]).unwrap())
} else if let Some(hdr) = req.headers().get(header::ACCESS_CONTROL_REQUEST_HEADERS) {
Some(hdr.clone())
} else {
None
};
Ok(Started::Response(
HTTPOk.build()
.if_some(self.max_age.as_ref(), |max_age, resp| {
let _ = resp.header(
header::ACCESS_CONTROL_MAX_AGE, format!("{}", max_age).as_str());})
.if_some(self.headers.as_ref(), |headers, resp| {
let _ = resp.header(
header::ACCESS_CONTROL_ALLOW_HEADERS,
&headers.iter().fold(
String::new(), |s, v| s + "," + v.as_str()).as_str()[1..]);})
.if_some(headers, |headers, resp| {
let _ = resp.header(header::ACCESS_CONTROL_ALLOW_HEADERS, headers); })
.if_true(self.origins.is_all(), |resp| {
if self.send_wildcard {
resp.header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*");

View File

@ -217,7 +217,7 @@ pub struct CookieSession {
inner: Rc<CookieSessionInner>,
}
/// Errors that can occure during handling cookie session
/// Errors that can occur during handling cookie session
#[derive(Fail, Debug)]
pub enum CookieSessionError {
/// Size of the serialized session is greater than 4000 bytes.

View File

@ -6,7 +6,7 @@ use std::slice::Iter;
use std::borrow::Cow;
use smallvec::SmallVec;
use error::{ResponseError, UriSegmentError, ErrorBadRequest};
use error::{ResponseError, UriSegmentError, InternalError, ErrorBadRequest};
/// A trait to abstract the idea of creating a new instance of a type from a path parameter.
@ -77,7 +77,7 @@ impl<'a> Params<'a> {
}
}
/// Return iterator to items in paramter container
/// Return iterator to items in parameter container
pub fn iter(&self) -> Iter<(Cow<'a, str>, Cow<'a, str>)> {
self.0.iter()
}
@ -141,7 +141,7 @@ impl FromParam for PathBuf {
macro_rules! FROM_STR {
($type:ty) => {
impl FromParam for $type {
type Err = ErrorBadRequest<<$type as FromStr>::Err>;
type Err = InternalError<<$type as FromStr>::Err>;
fn from_param(val: &str) -> Result<Self, Self::Err> {
<$type as FromStr>::from_str(val).map_err(ErrorBadRequest)

View File

@ -3,6 +3,7 @@ use std::rc::Rc;
use std::cell::RefCell;
use std::marker::PhantomData;
use log::Level::Debug;
use futures::{Async, Poll, Future, Stream};
use futures::unsync::oneshot;
@ -56,7 +57,7 @@ impl<S: 'static, H: PipelineHandler<S>> PipelineState<S, H> {
struct PipelineInfo<S> {
req: HttpRequest<S>,
count: usize,
count: u16,
mws: Rc<Vec<Box<Middleware<S>>>>,
context: Option<Box<ActorHttpContext>>,
error: Option<Error>,
@ -210,14 +211,14 @@ impl<S: 'static, H: PipelineHandler<S>> StartMiddlewares<S, H> {
fn init(info: &mut PipelineInfo<S>, handler: Rc<RefCell<H>>) -> PipelineState<S, H> {
// execute middlewares, we need this stage because middlewares could be non-async
// and we can move to next state immidietly
let len = info.mws.len();
// and we can move to next state immediately
let len = info.mws.len() as u16;
loop {
if info.count == len {
let reply = handler.borrow_mut().handle(info.req.clone());
return WaitingResponse::init(info, reply)
} else {
match info.mws[info.count].start(&mut info.req) {
match info.mws[info.count as usize].start(&mut info.req) {
Ok(Started::Done) =>
info.count += 1,
Ok(Started::Response(resp)) =>
@ -246,7 +247,7 @@ impl<S: 'static, H: PipelineHandler<S>> StartMiddlewares<S, H> {
}
fn poll(&mut self, info: &mut PipelineInfo<S>) -> Option<PipelineState<S, H>> {
let len = info.mws.len();
let len = info.mws.len() as u16;
'outer: loop {
match self.fut.as_mut().unwrap().poll() {
Ok(Async::NotReady) => return None,
@ -260,7 +261,7 @@ impl<S: 'static, H: PipelineHandler<S>> StartMiddlewares<S, H> {
return Some(WaitingResponse::init(info, reply));
} else {
loop {
match info.mws[info.count].start(info.req_mut()) {
match info.mws[info.count as usize].start(info.req_mut()) {
Ok(Started::Done) =>
info.count += 1,
Ok(Started::Response(resp)) => {
@ -334,7 +335,7 @@ impl<S: 'static, H> RunMiddlewares<S, H> {
loop {
resp = match info.mws[curr].response(info.req_mut(), resp) {
Err(err) => {
info.count = curr + 1;
info.count = (curr + 1) as u16;
return ProcessResponse::init(err.into())
}
Ok(Response::Done(r)) => {
@ -458,6 +459,13 @@ impl<S: 'static, H> ProcessResponse<S, H> {
}
};
if let Some(err) = self.resp.error() {
warn!("Error occured during request handling: {}", err);
if log_enabled!(Debug) {
debug!("{:?}", err);
}
}
match self.resp.replace_body(Body::Empty) {
Body::Streaming(stream) =>
self.iostate = IOState::Payload(stream),
@ -480,7 +488,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
},
Ok(Async::Ready(Some(chunk))) => {
self.iostate = IOState::Payload(body);
match io.write(chunk.as_ref()) {
match io.write(chunk.into()) {
Err(err) => {
info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp))
@ -522,7 +530,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
break 'outter
},
Frame::Chunk(Some(chunk)) => {
match io.write(chunk.as_ref()) {
match io.write(chunk) {
Err(err) => {
info.error = Some(err.into());
return Ok(
@ -575,16 +583,6 @@ impl<S: 'static, H> ProcessResponse<S, H> {
if self.running == RunningState::Paused || self.drain.is_some() {
match io.poll_completed(false) {
Ok(Async::Ready(_)) => {
match io.flush() {
Ok(Async::Ready(_)) => (),
Ok(Async::NotReady) => return Err(PipelineState::Response(self)),
Err(err) => {
debug!("Error sending data: {}", err);
info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp))
}
}
self.running.resume();
// resolve drain futures
@ -596,7 +594,6 @@ impl<S: 'static, H> ProcessResponse<S, H> {
},
Ok(Async::NotReady) => return Err(PipelineState::Response(self)),
Err(err) => {
debug!("Error sending data: {}", err);
info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp))
}
@ -609,7 +606,6 @@ impl<S: 'static, H> ProcessResponse<S, H> {
match io.write_eof() {
Ok(_) => (),
Err(err) => {
debug!("Error sending data: {}", err);
info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp))
}
@ -671,7 +667,7 @@ impl<S: 'static, H> FinishingMiddlewares<S, H> {
self.fut = None;
info.count -= 1;
match info.mws[info.count].finish(info.req_mut(), &self.resp) {
match info.mws[info.count as usize].finish(info.req_mut(), &self.resp) {
Finished::Done => {
if info.count == 0 {
return Some(Completed::init(info))
@ -692,6 +688,10 @@ impl<S, H> Completed<S, H> {
#[inline]
fn init(info: &mut PipelineInfo<S>) -> PipelineState<S, H> {
if let Some(ref err) = info.error {
error!("Error occured during request handling: {}", err);
}
if info.context.is_none() {
PipelineState::None
} else {

View File

@ -19,7 +19,7 @@ use httpresponse::HttpResponse;
/// Route uses builder-like pattern for configuration.
/// During request handling, resource object iterate through all routes
/// and check all predicates for specific route, if request matches all predicates route
/// route considired matched and route handler get called.
/// route considered matched and route handler get called.
///
/// ```rust
/// # extern crate actix_web;

View File

@ -3,7 +3,7 @@ use std::io::{Read, Write};
use std::fmt::Write as FmtWrite;
use std::str::FromStr;
use http::Version;
use http::{Version, Method, HttpTryFrom};
use http::header::{HeaderMap, HeaderValue,
ACCEPT_ENCODING, CONNECTION,
CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING};
@ -16,11 +16,13 @@ use bytes::{Bytes, BytesMut, BufMut, Writer};
use headers::ContentEncoding;
use body::{Body, Binary};
use error::PayloadError;
use helpers::SharedBytes;
use httprequest::HttpMessage;
use httpresponse::HttpResponse;
use payload::{PayloadSender, PayloadWriter};
use super::shared::SharedBytes;
impl ContentEncoding {
#[inline]
@ -344,15 +346,17 @@ impl PayloadEncoder {
pub fn new(buf: SharedBytes, req: &HttpMessage, resp: &mut HttpResponse) -> PayloadEncoder {
let version = resp.version().unwrap_or_else(|| req.version);
let mut body = resp.replace_body(Body::Empty);
let response_encoding = resp.content_encoding();
let has_body = match body {
Body::Empty => false,
Body::Binary(ref bin) => bin.len() >= 512,
Body::Binary(ref bin) =>
!(response_encoding == ContentEncoding::Auto && bin.len() < 96),
_ => true,
};
// Enable content encoding only if response does not contain Content-Encoding header
let mut encoding = if has_body {
let encoding = match *resp.content_encoding() {
let encoding = match response_encoding {
ContentEncoding::Auto => {
// negotiate content-encoding
if let Some(val) = req.headers.get(ACCEPT_ENCODING) {
@ -376,10 +380,12 @@ impl PayloadEncoder {
ContentEncoding::Identity
};
let transfer = match body {
let mut transfer = match body {
Body::Empty => {
resp.headers_mut().remove(CONTENT_LENGTH);
TransferEncoding::eof(buf)
if req.method != Method::HEAD {
resp.headers_mut().remove(CONTENT_LENGTH);
}
TransferEncoding::length(0, buf)
},
Body::Binary(ref mut bytes) => {
if encoding.is_compression() {
@ -396,13 +402,20 @@ impl PayloadEncoder {
ContentEncoding::Auto => unreachable!()
};
// TODO return error!
let _ = enc.write(bytes.as_ref());
let _ = enc.write(bytes.clone());
let _ = enc.write_eof();
*bytes = Binary::from(tmp.get_mut().take());
*bytes = Binary::from(tmp.take());
encoding = ContentEncoding::Identity;
}
resp.headers_mut().remove(CONTENT_LENGTH);
if req.method == Method::HEAD {
let mut b = BytesMut::new();
let _ = write!(b, "{}", bytes.len());
resp.headers_mut().insert(
CONTENT_LENGTH, HeaderValue::try_from(b.freeze()).unwrap());
} else {
resp.headers_mut().remove(CONTENT_LENGTH);
}
TransferEncoding::eof(buf)
}
Body::Streaming(_) | Body::Actor(_) => {
@ -423,7 +436,12 @@ impl PayloadEncoder {
}
}
};
resp.replace_body(body);
//
if req.method == Method::HEAD {
transfer.kind = TransferEncodingKind::Length(0);
} else {
resp.replace_body(body);
}
PayloadEncoder(
match encoding {
@ -503,16 +521,6 @@ impl PayloadEncoder {
impl PayloadEncoder {
#[inline]
pub fn len(&self) -> usize {
self.0.get_ref().len()
}
#[inline]
pub fn get_mut(&mut self) -> &mut BytesMut {
self.0.get_mut()
}
#[inline]
pub fn is_eof(&self) -> bool {
self.0.is_eof()
@ -520,7 +528,7 @@ impl PayloadEncoder {
#[cfg_attr(feature = "cargo-clippy", allow(inline_always))]
#[inline(always)]
pub fn write(&mut self, payload: &[u8]) -> Result<(), io::Error> {
pub fn write(&mut self, payload: Binary) -> Result<(), io::Error> {
self.0.write(payload)
}
@ -543,42 +551,10 @@ impl ContentEncoder {
#[inline]
pub fn is_eof(&self) -> bool {
match *self {
ContentEncoder::Br(ref encoder) =>
encoder.get_ref().is_eof(),
ContentEncoder::Deflate(ref encoder) =>
encoder.get_ref().is_eof(),
ContentEncoder::Gzip(ref encoder) =>
encoder.get_ref().is_eof(),
ContentEncoder::Identity(ref encoder) =>
encoder.is_eof(),
}
}
#[inline]
pub fn get_ref(&self) -> &BytesMut {
match *self {
ContentEncoder::Br(ref encoder) =>
encoder.get_ref().buffer.get_ref(),
ContentEncoder::Deflate(ref encoder) =>
encoder.get_ref().buffer.get_ref(),
ContentEncoder::Gzip(ref encoder) =>
encoder.get_ref().buffer.get_ref(),
ContentEncoder::Identity(ref encoder) =>
encoder.buffer.get_ref(),
}
}
#[inline]
pub fn get_mut(&mut self) -> &mut BytesMut {
match *self {
ContentEncoder::Br(ref mut encoder) =>
encoder.get_mut().buffer.get_mut(),
ContentEncoder::Deflate(ref mut encoder) =>
encoder.get_mut().buffer.get_mut(),
ContentEncoder::Gzip(ref mut encoder) =>
encoder.get_mut().buffer.get_mut(),
ContentEncoder::Identity(ref mut encoder) =>
encoder.buffer.get_mut(),
ContentEncoder::Br(ref encoder) => encoder.get_ref().is_eof(),
ContentEncoder::Deflate(ref encoder) => encoder.get_ref().is_eof(),
ContentEncoder::Gzip(ref encoder) => encoder.get_ref().is_eof(),
ContentEncoder::Identity(ref encoder) => encoder.is_eof(),
}
}
@ -629,10 +605,10 @@ impl ContentEncoder {
#[cfg_attr(feature = "cargo-clippy", allow(inline_always))]
#[inline(always)]
pub fn write(&mut self, data: &[u8]) -> Result<(), io::Error> {
pub fn write(&mut self, data: Binary) -> Result<(), io::Error> {
match *self {
ContentEncoder::Br(ref mut encoder) => {
match encoder.write(data) {
match encoder.write(data.as_ref()) {
Ok(_) =>
encoder.flush(),
Err(err) => {
@ -642,7 +618,7 @@ impl ContentEncoder {
}
},
ContentEncoder::Gzip(ref mut encoder) => {
match encoder.write(data) {
match encoder.write(data.as_ref()) {
Ok(_) =>
encoder.flush(),
Err(err) => {
@ -652,7 +628,7 @@ impl ContentEncoder {
}
}
ContentEncoder::Deflate(ref mut encoder) => {
match encoder.write(data) {
match encoder.write(data.as_ref()) {
Ok(_) =>
encoder.flush(),
Err(err) => {
@ -686,7 +662,7 @@ enum TransferEncodingKind {
Length(u64),
/// An Encoder for when Content-Length is not known.
///
/// Appliction decides when to stop writing.
/// Application decides when to stop writing.
Eof,
}
@ -727,11 +703,12 @@ impl TransferEncoding {
/// Encode message. Return `EOF` state of encoder
#[inline]
pub fn encode(&mut self, msg: &[u8]) -> io::Result<bool> {
pub fn encode(&mut self, mut msg: Binary) -> io::Result<bool> {
match self.kind {
TransferEncodingKind::Eof => {
self.buffer.get_mut().extend_from_slice(msg);
Ok(msg.is_empty())
let eof = msg.is_empty();
self.buffer.extend(msg);
Ok(eof)
},
TransferEncodingKind::Chunked(ref mut eof) => {
if *eof {
@ -740,24 +717,31 @@ impl TransferEncoding {
if msg.is_empty() {
*eof = true;
self.buffer.get_mut().extend_from_slice(b"0\r\n\r\n");
self.buffer.extend_from_slice(b"0\r\n\r\n");
} else {
write!(self.buffer.get_mut(), "{:X}\r\n", msg.len())
let mut buf = BytesMut::new();
write!(&mut buf, "{:X}\r\n", msg.len())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
self.buffer.get_mut().extend_from_slice(msg);
self.buffer.get_mut().extend_from_slice(b"\r\n");
self.buffer.reserve(buf.len() + msg.len() + 2);
self.buffer.extend(buf.into());
self.buffer.extend(msg);
self.buffer.extend_from_slice(b"\r\n");
}
Ok(*eof)
},
TransferEncodingKind::Length(ref mut remaining) => {
if msg.is_empty() {
return Ok(*remaining == 0)
}
let max = cmp::min(*remaining, msg.len() as u64);
self.buffer.get_mut().extend_from_slice(msg[..max as usize].as_ref());
if *remaining > 0 {
if msg.is_empty() {
return Ok(*remaining == 0)
}
let len = cmp::min(*remaining, msg.len() as u64);
self.buffer.extend(msg.take().split_to(len as usize).into());
*remaining -= max as u64;
Ok(*remaining == 0)
*remaining -= len as u64;
Ok(*remaining == 0)
} else {
Ok(true)
}
},
}
}
@ -770,7 +754,7 @@ impl TransferEncoding {
TransferEncodingKind::Chunked(ref mut eof) => {
if !*eof {
*eof = true;
self.buffer.get_mut().extend_from_slice(b"0\r\n\r\n");
self.buffer.extend_from_slice(b"0\r\n\r\n");
}
},
}
@ -781,7 +765,7 @@ impl io::Write for TransferEncoding {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.encode(buf)?;
self.encode(Binary::from_slice(buf))?;
Ok(buf.len())
}
@ -867,8 +851,8 @@ mod tests {
fn test_chunked_te() {
let bytes = SharedBytes::default();
let mut enc = TransferEncoding::chunked(bytes.clone());
assert!(!enc.encode(b"test").ok().unwrap());
assert!(enc.encode(b"").ok().unwrap());
assert!(!enc.encode(Binary::from(b"test".as_ref())).ok().unwrap());
assert!(enc.encode(Binary::from(b"".as_ref())).ok().unwrap());
assert_eq!(bytes.get_mut().take().freeze(),
Bytes::from_static(b"4\r\ntest\r\n0\r\n\r\n"));
}

View File

@ -96,12 +96,12 @@ impl<T, H> Http1<T, H>
}
}
// TODO: refacrtor
// TODO: refactor
#[cfg_attr(feature = "cargo-clippy", allow(cyclomatic_complexity))]
pub fn poll(&mut self) -> Poll<(), ()> {
// keep-alive timer
if self.keepalive_timer.is_some() {
match self.keepalive_timer.as_mut().unwrap().poll() {
if let Some(ref mut timer) = self.keepalive_timer {
match timer.poll() {
Ok(Async::Ready(_)) => {
trace!("Keep-alive timeout, close connection");
return Ok(Async::Ready(()))
@ -133,7 +133,7 @@ impl<T, H> Http1<T, H>
Ok(Async::Ready(ready)) => {
not_ready = false;
// overide keep-alive state
// override keep-alive state
if self.stream.keepalive() {
self.flags.insert(Flags::KEEPALIVE);
} else {
@ -146,10 +146,8 @@ impl<T, H> Http1<T, H>
item.flags.insert(EntryFlags::FINISHED);
}
},
Ok(Async::NotReady) => {
// no more IO for this iteration
io = true;
},
// no more IO for this iteration
Ok(Async::NotReady) => io = true,
Err(err) => {
// it is not possible to recover from error
// during pipe handling, so just drop connection
@ -227,38 +225,7 @@ impl<T, H> Http1<T, H>
self.tasks.push_back(
Entry {pipe: pipe.unwrap_or_else(|| Pipeline::error(HTTPNotFound)),
flags: EntryFlags::empty()});
}
Err(ReaderError::Disconnect) => {
not_ready = false;
self.flags.insert(Flags::ERROR);
self.stream.disconnected();
for entry in &mut self.tasks {
entry.pipe.disconnected()
}
},
Err(err) => {
// notify all tasks
not_ready = false;
self.stream.disconnected();
for entry in &mut self.tasks {
entry.pipe.disconnected()
}
// kill keepalive
self.flags.remove(Flags::KEEPALIVE);
self.keepalive_timer.take();
// on parse error, stop reading stream but tasks need to be completed
self.flags.insert(Flags::ERROR);
if self.tasks.is_empty() {
if let ReaderError::Error(err) = err {
self.tasks.push_back(
Entry {pipe: Pipeline::error(err.error_response()),
flags: EntryFlags::empty()});
}
}
}
Ok(Async::NotReady) => {
// start keep-alive timer, this also is slow request timeout
if self.tasks.is_empty() {
@ -293,7 +260,38 @@ impl<T, H> Http1<T, H>
}
}
break
}
},
Err(ReaderError::Disconnect) => {
not_ready = false;
self.flags.insert(Flags::ERROR);
self.stream.disconnected();
for entry in &mut self.tasks {
entry.pipe.disconnected()
}
},
Err(err) => {
// notify all tasks
not_ready = false;
self.stream.disconnected();
for entry in &mut self.tasks {
entry.pipe.disconnected()
}
// kill keepalive
self.flags.remove(Flags::KEEPALIVE);
self.keepalive_timer.take();
// on parse error, stop reading stream but tasks need to be completed
self.flags.insert(Flags::ERROR);
if self.tasks.is_empty() {
if let ReaderError::Error(err) = err {
self.tasks.push_back(
Entry {pipe: Pipeline::error(err.error_response()),
flags: EntryFlags::empty()});
}
}
},
}
}
@ -1204,6 +1202,7 @@ mod tests {
panic!("Error");
}
// type in chunked
let mut buf = Buffer::new(
"GET /test HTTP/1.1\r\n\
transfer-encoding: chnked\r\n\r\n");

View File

@ -2,15 +2,15 @@ use std::io;
use bytes::BufMut;
use futures::{Async, Poll};
use tokio_io::AsyncWrite;
use http::Version;
use http::{Method, Version};
use http::header::{HeaderValue, CONNECTION, DATE};
use helpers;
use body::Body;
use helpers::SharedBytes;
use body::{Body, Binary};
use httprequest::HttpMessage;
use httpresponse::HttpResponse;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
use super::shared::SharedBytes;
use super::encoding::PayloadEncoder;
const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific
@ -56,23 +56,25 @@ impl<T: AsyncWrite> H1Writer<T> {
}
pub fn disconnected(&mut self) {
self.encoder.get_mut().take();
self.buffer.take();
}
pub fn keepalive(&self) -> bool {
self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE)
}
fn write_to_stream(&mut self) -> Result<WriterState, io::Error> {
let buffer = self.encoder.get_mut();
while !buffer.is_empty() {
match self.stream.write(buffer.as_ref()) {
fn write_to_stream(&mut self) -> io::Result<WriterState> {
while !self.buffer.is_empty() {
match self.stream.write(self.buffer.as_ref()) {
Ok(0) => {
self.disconnected();
return Ok(WriterState::Done);
},
Ok(n) => {
let _ = buffer.split_to(n);
let _ = self.buffer.split_to(n);
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if buffer.len() > MAX_WRITE_BUFFER_SIZE {
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
return Ok(WriterState::Pause)
} else {
return Ok(WriterState::Done)
@ -92,23 +94,7 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
self.written
}
#[inline]
fn flush(&mut self) -> Poll<(), io::Error> {
match self.stream.flush() {
Ok(_) => Ok(Async::Ready(())),
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
Ok(Async::NotReady)
} else {
Err(e)
}
}
}
}
fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse)
-> Result<WriterState, io::Error>
{
fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse) -> io::Result<WriterState> {
// prepare task
self.flags.insert(Flags::STARTED);
self.encoder = PayloadEncoder::new(self.buffer.clone(), req, msg);
@ -133,7 +119,7 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
// render message
{
let mut buffer = self.encoder.get_mut();
let mut buffer = self.buffer.get_mut();
if let Body::Binary(ref bytes) = body {
buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len());
} else {
@ -146,7 +132,11 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
match body {
Body::Empty =>
buffer.extend_from_slice(b"\r\ncontent-length: 0\r\n"),
if req.method != Method::HEAD {
buffer.extend_from_slice(b"\r\ncontent-length: 0\r\n");
} else {
buffer.extend_from_slice(b"\r\n");
},
Body::Binary(ref bytes) =>
helpers::write_content_length(bytes.len(), &mut buffer),
_ =>
@ -176,14 +166,14 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
if let Body::Binary(bytes) = body {
self.written = bytes.len() as u64;
self.encoder.write(bytes.as_ref())?;
self.encoder.write(bytes)?;
} else {
msg.replace_body(body);
}
Ok(WriterState::Done)
}
fn write(&mut self, payload: &[u8]) -> Result<WriterState, io::Error> {
fn write(&mut self, payload: Binary) -> io::Result<WriterState> {
self.written += payload.len() as u64;
if !self.flags.contains(Flags::DISCONNECTED) {
if self.flags.contains(Flags::STARTED) {
@ -192,24 +182,24 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
return Ok(WriterState::Done)
} else {
// might be response to EXCEPT
self.encoder.get_mut().extend_from_slice(payload)
self.buffer.extend_from_slice(payload.as_ref())
}
}
if self.encoder.len() > MAX_WRITE_BUFFER_SIZE {
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
Ok(WriterState::Pause)
} else {
Ok(WriterState::Done)
}
}
fn write_eof(&mut self) -> Result<WriterState, io::Error> {
fn write_eof(&mut self) -> io::Result<WriterState> {
self.encoder.write_eof()?;
if !self.encoder.is_eof() {
Err(io::Error::new(io::ErrorKind::Other,
"Last payload item, but eof is not reached"))
} else if self.encoder.len() > MAX_WRITE_BUFFER_SIZE {
} else if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
Ok(WriterState::Pause)
} else {
Ok(WriterState::Done)

View File

@ -7,11 +7,11 @@ use http::{Version, HttpTryFrom, Response};
use http::header::{HeaderValue, CONNECTION, TRANSFER_ENCODING, DATE, CONTENT_LENGTH};
use helpers;
use body::Body;
use helpers::SharedBytes;
use body::{Body, Binary};
use httprequest::HttpMessage;
use httpresponse::HttpResponse;
use super::encoding::PayloadEncoder;
use super::shared::SharedBytes;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
const CHUNK_SIZE: usize = 16_384;
@ -52,15 +52,13 @@ impl H2Writer {
}
}
fn write_to_stream(&mut self) -> Result<WriterState, io::Error> {
fn write_to_stream(&mut self) -> io::Result<WriterState> {
if !self.flags.contains(Flags::STARTED) {
return Ok(WriterState::Done)
}
if let Some(ref mut stream) = self.stream {
let buffer = self.encoder.get_mut();
if buffer.is_empty() {
if self.buffer.is_empty() {
if self.flags.contains(Flags::EOF) {
let _ = stream.send_data(Bytes::new(), true);
}
@ -70,7 +68,7 @@ impl H2Writer {
loop {
match stream.poll_capacity() {
Ok(Async::NotReady) => {
if buffer.len() > MAX_WRITE_BUFFER_SIZE {
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
return Ok(WriterState::Pause)
} else {
return Ok(WriterState::Done)
@ -80,15 +78,15 @@ impl H2Writer {
return Ok(WriterState::Done)
}
Ok(Async::Ready(Some(cap))) => {
let len = buffer.len();
let bytes = buffer.split_to(cmp::min(cap, len));
let eof = buffer.is_empty() && self.flags.contains(Flags::EOF);
let len = self.buffer.len();
let bytes = self.buffer.split_to(cmp::min(cap, len));
let eof = self.buffer.is_empty() && self.flags.contains(Flags::EOF);
self.written += bytes.len() as u64;
if let Err(err) = stream.send_data(bytes.freeze(), eof) {
return Err(io::Error::new(io::ErrorKind::Other, err))
} else if !buffer.is_empty() {
let cap = cmp::min(buffer.len(), CHUNK_SIZE);
} else if !self.buffer.is_empty() {
let cap = cmp::min(self.buffer.len(), CHUNK_SIZE);
stream.reserve_capacity(cap);
} else {
return Ok(WriterState::Pause)
@ -110,16 +108,7 @@ impl Writer for H2Writer {
self.written
}
#[inline]
fn flush(&mut self) -> Poll<(), io::Error> {
Ok(Async::Ready(()))
}
fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse)
-> Result<WriterState, io::Error>
{
// trace!("Prepare response with status: {:?}", msg.status());
fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse) -> io::Result<WriterState> {
// prepare response
self.flags.insert(Flags::STARTED);
self.encoder = PayloadEncoder::new(self.buffer.clone(), req, msg);
@ -172,9 +161,9 @@ impl Writer for H2Writer {
if let Body::Binary(bytes) = body {
self.flags.insert(Flags::EOF);
self.written = bytes.len() as u64;
self.encoder.write(bytes.as_ref())?;
self.encoder.write(bytes)?;
if let Some(ref mut stream) = self.stream {
stream.reserve_capacity(cmp::min(self.encoder.len(), CHUNK_SIZE));
stream.reserve_capacity(cmp::min(self.buffer.len(), CHUNK_SIZE));
}
Ok(WriterState::Pause)
} else {
@ -183,7 +172,7 @@ impl Writer for H2Writer {
}
}
fn write(&mut self, payload: &[u8]) -> Result<WriterState, io::Error> {
fn write(&mut self, payload: Binary) -> io::Result<WriterState> {
self.written = payload.len() as u64;
if !self.flags.contains(Flags::DISCONNECTED) {
@ -192,25 +181,25 @@ impl Writer for H2Writer {
self.encoder.write(payload)?;
} else {
// might be response for EXCEPT
self.encoder.get_mut().extend_from_slice(payload)
self.buffer.extend_from_slice(payload.as_ref())
}
}
if self.encoder.len() > MAX_WRITE_BUFFER_SIZE {
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
Ok(WriterState::Pause)
} else {
Ok(WriterState::Done)
}
}
fn write_eof(&mut self) -> Result<WriterState, io::Error> {
fn write_eof(&mut self) -> io::Result<WriterState> {
self.encoder.write_eof()?;
self.flags.insert(Flags::EOF);
if !self.encoder.is_eof() {
Err(io::Error::new(io::ErrorKind::Other,
"Last payload item, but eof is not reached"))
} else if self.encoder.len() > MAX_WRITE_BUFFER_SIZE {
} else if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
Ok(WriterState::Pause)
} else {
Ok(WriterState::Done)

View File

@ -15,11 +15,13 @@ mod h2;
mod h1writer;
mod h2writer;
mod settings;
mod shared;
mod utils;
pub use self::srv::HttpServer;
pub use self::settings::ServerSettings;
use body::Binary;
use error::Error;
use httprequest::{HttpMessage, HttpRequest};
use httpresponse::HttpResponse;
@ -54,6 +56,12 @@ pub trait HttpHandler: 'static {
fn handle(&mut self, req: HttpRequest) -> Result<Box<HttpHandlerTask>, HttpRequest>;
}
impl HttpHandler for Box<HttpHandler> {
fn handle(&mut self, req: HttpRequest) -> Result<Box<HttpHandlerTask>, HttpRequest> {
self.as_mut().handle(req)
}
}
pub trait HttpHandlerTask {
fn poll(&mut self) -> Poll<(), Error>;
@ -90,14 +98,11 @@ pub enum WriterState {
pub trait Writer {
fn written(&self) -> u64;
fn start(&mut self, req: &mut HttpMessage, resp: &mut HttpResponse)
-> Result<WriterState, io::Error>;
fn start(&mut self, req: &mut HttpMessage, resp: &mut HttpResponse) -> io::Result<WriterState>;
fn write(&mut self, payload: &[u8]) -> Result<WriterState, io::Error>;
fn write(&mut self, payload: Binary) -> io::Result<WriterState>;
fn write_eof(&mut self) -> Result<WriterState, io::Error>;
fn flush(&mut self) -> Poll<(), io::Error>;
fn write_eof(&mut self) -> io::Result<WriterState>;
fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error>;
}

View File

@ -4,6 +4,7 @@ use std::cell::{Cell, RefCell, RefMut};
use helpers;
use super::channel::Node;
use super::shared::{SharedBytes, SharedBytesPool};
/// Various server settings
#[derive(Debug, Clone)]
@ -63,7 +64,7 @@ pub(crate) struct WorkerSettings<H> {
h: RefCell<Vec<H>>,
enabled: bool,
keep_alive: u64,
bytes: Rc<helpers::SharedBytesPool>,
bytes: Rc<SharedBytesPool>,
messages: Rc<helpers::SharedMessagePool>,
channels: Cell<usize>,
node: Node<()>,
@ -75,7 +76,7 @@ impl<H> WorkerSettings<H> {
h: RefCell::new(h),
enabled: if let Some(ka) = keep_alive { ka > 0 } else { false },
keep_alive: keep_alive.unwrap_or(0),
bytes: Rc::new(helpers::SharedBytesPool::new()),
bytes: Rc::new(SharedBytesPool::new()),
messages: Rc::new(helpers::SharedMessagePool::new()),
channels: Cell::new(0),
node: Node::head(),
@ -102,8 +103,8 @@ impl<H> WorkerSettings<H> {
self.enabled
}
pub fn get_shared_bytes(&self) -> helpers::SharedBytes {
helpers::SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes))
pub fn get_shared_bytes(&self) -> SharedBytes {
SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes))
}
pub fn get_http_message(&self) -> helpers::SharedHttpMessage {

120
src/server/shared.rs Normal file
View File

@ -0,0 +1,120 @@
use std::mem;
use std::cell::RefCell;
use std::rc::Rc;
use std::collections::VecDeque;
use bytes::BytesMut;
use body::Binary;
/// Internal use only! unsafe
#[derive(Debug)]
pub(crate) struct SharedBytesPool(RefCell<VecDeque<Rc<BytesMut>>>);
impl SharedBytesPool {
pub fn new() -> SharedBytesPool {
SharedBytesPool(RefCell::new(VecDeque::with_capacity(128)))
}
pub fn get_bytes(&self) -> Rc<BytesMut> {
if let Some(bytes) = self.0.borrow_mut().pop_front() {
bytes
} else {
Rc::new(BytesMut::new())
}
}
pub fn release_bytes(&self, mut bytes: Rc<BytesMut>) {
let v = &mut self.0.borrow_mut();
if v.len() < 128 {
Rc::get_mut(&mut bytes).unwrap().take();
v.push_front(bytes);
}
}
}
#[derive(Debug)]
pub(crate) struct SharedBytes(
Option<Rc<BytesMut>>, Option<Rc<SharedBytesPool>>);
impl Drop for SharedBytes {
fn drop(&mut self) {
if let Some(ref pool) = self.1 {
if let Some(bytes) = self.0.take() {
if Rc::strong_count(&bytes) == 1 {
pool.release_bytes(bytes);
}
}
}
}
}
impl SharedBytes {
pub fn empty() -> Self {
SharedBytes(None, None)
}
pub fn new(bytes: Rc<BytesMut>, pool: Rc<SharedBytesPool>) -> SharedBytes {
SharedBytes(Some(bytes), Some(pool))
}
#[inline(always)]
#[allow(mutable_transmutes)]
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))]
pub fn get_mut(&self) -> &mut BytesMut {
let r: &BytesMut = self.0.as_ref().unwrap().as_ref();
unsafe{mem::transmute(r)}
}
#[inline]
pub fn len(&self) -> usize {
self.0.as_ref().unwrap().len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.0.as_ref().unwrap().is_empty()
}
#[inline]
pub fn as_ref(&self) -> &[u8] {
self.0.as_ref().unwrap().as_ref()
}
pub fn split_to(&self, n: usize) -> BytesMut {
self.get_mut().split_to(n)
}
pub fn take(&self) -> BytesMut {
self.get_mut().take()
}
#[inline]
pub fn reserve(&self, cnt: usize) {
self.get_mut().reserve(cnt)
}
#[inline]
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
pub fn extend(&self, data: Binary) {
self.get_mut().extend_from_slice(data.as_ref());
}
#[inline]
pub fn extend_from_slice(&self, data: &[u8]) {
self.get_mut().extend_from_slice(data);
}
}
impl Default for SharedBytes {
fn default() -> Self {
SharedBytes(Some(Rc::new(BytesMut::new())), None)
}
}
impl Clone for SharedBytes {
fn clone(&self) -> SharedBytes {
SharedBytes(self.0.clone(), self.1.clone())
}
}

View File

@ -268,9 +268,9 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
where U: IntoIterator<Item=V> + 'static,
V: IntoHttpHandler<Handler=H>,
{
/// Start listening for incomming connections.
/// Start listening for incoming connections.
///
/// This method starts number of http handler workers in seperate threads.
/// This method starts number of http handler workers in separate threads.
/// For each address this method starts separate thread which does `accept()` in a loop.
///
/// This methods panics if no socket addresses get bound.
@ -298,7 +298,7 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
pub fn start(mut self) -> SyncAddress<Self>
{
if self.sockets.is_empty() {
panic!("HttpServer::bind() has to be called befor start()");
panic!("HttpServer::bind() has to be called before start()");
} else {
let addrs: Vec<(net::SocketAddr, net::TcpListener)> =
self.sockets.drain().collect();
@ -320,7 +320,7 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
}
}
/// Spawn new thread and start listening for incomming connections.
/// Spawn new thread and start listening for incoming connections.
///
/// This method spawns new thread and starts new actix system. Other than that it is
/// similar to `start()` method. This method blocks.
@ -359,7 +359,7 @@ impl<H: HttpHandler, U, V> HttpServer<TlsStream<TcpStream>, net::SocketAddr, H,
where U: IntoIterator<Item=V> + 'static,
V: IntoHttpHandler<Handler=H>,
{
/// Start listening for incomming tls connections.
/// Start listening for incoming tls connections.
pub fn start_tls(mut self, pkcs12: ::Pkcs12) -> io::Result<SyncAddress<Self>> {
if self.sockets.is_empty() {
Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound"))
@ -398,7 +398,7 @@ impl<H: HttpHandler, U, V> HttpServer<SslStream<TcpStream>, net::SocketAddr, H,
where U: IntoIterator<Item=V> + 'static,
V: IntoHttpHandler<Handler=H>,
{
/// Start listening for incomming tls connections.
/// Start listening for incoming tls connections.
///
/// This method sets alpn protocols to "h2" and "http/1.1"
pub fn start_ssl(mut self, identity: &ParsedPkcs12) -> io::Result<SyncAddress<Self>> {
@ -443,7 +443,7 @@ impl<T, A, H, U, V> HttpServer<WrapperStream<T>, A, H, U>
U: IntoIterator<Item=V> + 'static,
V: IntoHttpHandler<Handler=H>,
{
/// Start listening for incomming connections from a stream.
/// Start listening for incoming connections from a stream.
///
/// This method uses only one thread for handling incoming connections.
pub fn start_incoming<S>(mut self, stream: S, secure: bool) -> SyncAddress<Self>
@ -663,7 +663,7 @@ fn start_accept_thread(sock: net::TcpListener, addr: net::SocketAddr, backlog: i
}
}
// Start listening for incommin commands
// Start listening for incoming commands
if let Err(err) = poll.register(&reg, CMD,
mio::Ready::readable(), mio::PollOpt::edge()) {
panic!("Can not register Registration: {}", err);

View File

@ -29,7 +29,7 @@ use server::{HttpServer, HttpHandler, IntoHttpHandler, ServerSettings};
/// The `TestServer` type.
///
/// `TestServer` is very simple test server that simplify process of writing
/// integrational tests cases for actix web applications.
/// integration tests cases for actix web applications.
///
/// # Examples
///
@ -61,7 +61,7 @@ impl TestServer {
/// Start new test server
///
/// This methos accepts configuration method. You can add
/// This method accepts configuration method. You can add
/// middlewares or set handlers for test application.
pub fn new<F>(config: F) -> Self
where F: Sync + Send + 'static + Fn(&mut TestApp<()>),
@ -101,7 +101,7 @@ impl TestServer {
/// Start new test server with custom application state
///
/// This methos accepts state factory and configuration method.
/// This method accepts state factory and configuration method.
pub fn with_state<S, FS, F>(state: FS, config: F) -> Self
where S: 'static,
FS: Sync + Send + 'static + Fn() -> S,
@ -287,12 +287,12 @@ impl Default for TestRequest<()> {
impl TestRequest<()> {
/// Create TestReqeust and set request uri
/// Create TestRequest and set request uri
pub fn with_uri(path: &str) -> TestRequest<()> {
TestRequest::default().uri(path)
}
/// Create TestReqeust and set header
/// Create TestRequest and set header
pub fn with_header<K, V>(key: K, value: V) -> TestRequest<()>
where HeaderName: HttpTryFrom<K>,
HeaderValue: HttpTryFrom<V>

View File

@ -5,14 +5,7 @@ use bytes::BytesMut;
use body::Binary;
use ws::proto::{OpCode, CloseCode};
fn apply_mask(buf: &mut [u8], mask: &[u8; 4]) {
let iter = buf.iter_mut().zip(mask.iter().cycle());
for (byte, &key) in iter {
*byte ^= key
}
}
use ws::mask::apply_mask;
/// A struct representing a `WebSocket` frame.
#[derive(Debug)]
@ -28,7 +21,7 @@ pub(crate) struct Frame {
impl Frame {
/// Desctructe frame
/// Destruct frame
pub fn unpack(self) -> (bool, OpCode, Binary) {
(self.finished, self.opcode, self.payload)
}

120
src/ws/mask.rs Normal file
View File

@ -0,0 +1,120 @@
//! This is code from [Tungstenite project](https://github.com/snapview/tungstenite-rs)
use std::cmp::min;
use std::mem::uninitialized;
use std::ptr::copy_nonoverlapping;
/// Mask/unmask a frame.
#[inline]
pub fn apply_mask(buf: &mut [u8], mask: &[u8; 4]) {
apply_mask_fast32(buf, mask)
}
/// A safe unoptimized mask application.
#[inline]
#[allow(dead_code)]
fn apply_mask_fallback(buf: &mut [u8], mask: &[u8; 4]) {
for (i, byte) in buf.iter_mut().enumerate() {
*byte ^= mask[i & 3];
}
}
/// Faster version of `apply_mask()` which operates on 4-byte blocks.
#[inline]
#[allow(dead_code)]
fn apply_mask_fast32(buf: &mut [u8], mask: &[u8; 4]) {
// TODO replace this with read_unaligned() as it stabilizes.
let mask_u32 = unsafe {
let mut m: u32 = uninitialized();
#[allow(trivial_casts)]
copy_nonoverlapping(mask.as_ptr(), &mut m as *mut _ as *mut u8, 4);
m
};
let mut ptr = buf.as_mut_ptr();
let mut len = buf.len();
// Possible first unaligned block.
let head = min(len, (4 - (ptr as usize & 3)) & 3);
let mask_u32 = if head > 0 {
unsafe {
xor_mem(ptr, mask_u32, head);
ptr = ptr.offset(head as isize);
}
len -= head;
if cfg!(target_endian = "big") {
mask_u32.rotate_left(8 * head as u32)
} else {
mask_u32.rotate_right(8 * head as u32)
}
} else {
mask_u32
};
if len > 0 {
debug_assert_eq!(ptr as usize % 4, 0);
}
// Properly aligned middle of the data.
while len > 4 {
unsafe {
*(ptr as *mut u32) ^= mask_u32;
ptr = ptr.offset(4);
len -= 4;
}
}
// Possible last block.
if len > 0 {
unsafe { xor_mem(ptr, mask_u32, len); }
}
}
#[inline]
// TODO: copy_nonoverlapping here compiles to call memcpy. While it is not so inefficient,
// it could be done better. The compiler does not see that len is limited to 3.
unsafe fn xor_mem(ptr: *mut u8, mask: u32, len: usize) {
let mut b: u32 = uninitialized();
#[allow(trivial_casts)]
copy_nonoverlapping(ptr, &mut b as *mut _ as *mut u8, len);
b ^= mask;
#[allow(trivial_casts)]
copy_nonoverlapping(&b as *const _ as *const u8, ptr, len);
}
#[cfg(test)]
mod tests {
use super::{apply_mask_fallback, apply_mask_fast32};
#[test]
fn test_apply_mask() {
let mask = [
0x6d, 0xb6, 0xb2, 0x80,
];
let unmasked = vec![
0xf3, 0x00, 0x01, 0x02, 0x03, 0x80, 0x81, 0x82,
0xff, 0xfe, 0x00, 0x17, 0x74, 0xf9, 0x12, 0x03,
];
// Check masking with proper alignment.
{
let mut masked = unmasked.clone();
apply_mask_fallback(&mut masked, &mask);
let mut masked_fast = unmasked.clone();
apply_mask_fast32(&mut masked_fast, &mask);
assert_eq!(masked, masked_fast);
}
// Check masking without alignment.
{
let mut masked = unmasked.clone();
apply_mask_fallback(&mut masked[1..], &mask);
let mut masked_fast = unmasked.clone();
apply_mask_fast32(&mut masked_fast[1..], &mask);
assert_eq!(masked, masked_fast);
}
}
}

View File

@ -58,6 +58,7 @@ use httpresponse::{ConnectionType, HttpResponse, HttpResponseBuilder};
mod frame;
mod proto;
mod context;
mod mask;
use ws::frame::Frame;
use ws::proto::{hash_key, OpCode};

View File

@ -152,6 +152,66 @@ fn test_body_br_streaming() {
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
}
#[test]
fn test_head_empty() {
let srv = test::TestServer::new(
|app| app.handler(|_| {
httpcodes::HTTPOk.build()
.content_length(STR.len() as u64).finish()}));
let client = reqwest::Client::new();
let mut res = client.head(&srv.url("/")).send().unwrap();
assert!(res.status().is_success());
let mut bytes = BytesMut::with_capacity(2048).writer();
let len = res.headers()
.get::<reqwest::header::ContentLength>().map(|ct_len| **ct_len).unwrap();
assert_eq!(len, STR.len() as u64);
let _ = res.copy_to(&mut bytes);
let bytes = bytes.into_inner();
assert!(bytes.is_empty());
}
#[test]
fn test_head_binary() {
let srv = test::TestServer::new(
|app| app.handler(|_| {
httpcodes::HTTPOk.build()
.content_encoding(headers::ContentEncoding::Identity)
.content_length(100).body(STR)}));
let client = reqwest::Client::new();
let mut res = client.head(&srv.url("/")).send().unwrap();
assert!(res.status().is_success());
let mut bytes = BytesMut::with_capacity(2048).writer();
let len = res.headers()
.get::<reqwest::header::ContentLength>().map(|ct_len| **ct_len).unwrap();
assert_eq!(len, STR.len() as u64);
let _ = res.copy_to(&mut bytes);
let bytes = bytes.into_inner();
assert!(bytes.is_empty());
}
#[test]
fn test_head_binary2() {
let srv = test::TestServer::new(
|app| app.handler(|_| {
httpcodes::HTTPOk.build()
.content_encoding(headers::ContentEncoding::Identity)
.body(STR)
}));
let client = reqwest::Client::new();
let mut res = client.head(&srv.url("/")).send().unwrap();
assert!(res.status().is_success());
let mut bytes = BytesMut::with_capacity(2048).writer();
let len = res.headers()
.get::<reqwest::header::ContentLength>().map(|ct_len| **ct_len).unwrap();
assert_eq!(len, STR.len() as u64);
let _ = res.copy_to(&mut bytes);
let bytes = bytes.into_inner();
assert!(bytes.is_empty());
}
#[test]
fn test_body_length() {
let srv = test::TestServer::new(