mirror of
https://github.com/fafhrd91/actix-web
synced 2025-07-15 06:26:11 +02:00
Compare commits
27 Commits
Author | SHA1 | Date | |
---|---|---|---|
2227120ae0 | |||
21c8c0371d | |||
1914a6a0d8 | |||
1cff4619e7 | |||
7bb7adf89c | |||
f55ff24925 | |||
f5f78d79e6 | |||
9180625dfd | |||
552320bae2 | |||
7cf221f767 | |||
98931a8623 | |||
ae10a89014 | |||
71d534dadb | |||
867bb1d409 | |||
91c44a1cf1 | |||
3bc60a8d5d | |||
58df8fa4b9 | |||
81f92b43e5 | |||
e1d9c3803b | |||
a7c24aace1 | |||
89a89e7b18 | |||
3425f7be40 | |||
09a6f8a34f | |||
7060f298b4 | |||
33dbe15760 | |||
e95c7dfc29 | |||
927a92fcac |
@ -46,7 +46,9 @@ script:
|
||||
cargo clean
|
||||
USE_SKEPTIC=1 cargo test --features=alpn
|
||||
else
|
||||
cargo test --features=alpn
|
||||
cargo clean
|
||||
cargo test
|
||||
# --features=alpn
|
||||
fi
|
||||
|
||||
- |
|
||||
|
13
CHANGES.md
13
CHANGES.md
@ -1,5 +1,18 @@
|
||||
# Changes
|
||||
|
||||
## 0.3.2 (2018-01-21)
|
||||
|
||||
* Fix HEAD requests handling
|
||||
|
||||
* Log request processing errors
|
||||
|
||||
* Always enable content encoding if encoding explicitly selected
|
||||
|
||||
* Allow multiple Applications on a single server with different state #49
|
||||
|
||||
* CORS middleware: allowed_headers is defaulting to None #50
|
||||
|
||||
|
||||
## 0.3.1 (2018-01-13)
|
||||
|
||||
* Fix directory entry path #47
|
||||
|
@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-web"
|
||||
version = "0.3.1"
|
||||
version = "0.3.2"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix web framework"
|
||||
readme = "README.md"
|
||||
@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous",
|
||||
"web-programming::http-server", "web-programming::websocket"]
|
||||
license = "MIT/Apache-2.0"
|
||||
exclude = [".gitignore", ".travis.yml", ".cargo/config",
|
||||
"appveyor.yml", "./examples/static/*"]
|
||||
"appveyor.yml", "/examples/**"]
|
||||
build = "build.rs"
|
||||
|
||||
[badges]
|
||||
@ -81,7 +81,7 @@ version = "0.9"
|
||||
optional = true
|
||||
|
||||
[dev-dependencies]
|
||||
env_logger = "0.4"
|
||||
env_logger = "0.5"
|
||||
reqwest = "0.8"
|
||||
skeptic = "0.13"
|
||||
serde_derive = "1.0"
|
||||
|
@ -6,6 +6,6 @@ workspace = "../.."
|
||||
|
||||
[dependencies]
|
||||
futures = "*"
|
||||
env_logger = "0.4"
|
||||
env_logger = "0.5"
|
||||
actix = "0.4"
|
||||
actix-web = { path = "../../" }
|
||||
actix-web = { path="../.." }
|
||||
|
@ -7,6 +7,7 @@ extern crate env_logger;
|
||||
extern crate futures;
|
||||
use futures::Stream;
|
||||
|
||||
use std::{io, env};
|
||||
use actix_web::*;
|
||||
use actix_web::middleware::RequestSession;
|
||||
use futures::future::{FutureResult, result};
|
||||
@ -56,17 +57,17 @@ fn index(mut req: HttpRequest) -> Result<HttpResponse> {
|
||||
fn p404(req: HttpRequest) -> Result<HttpResponse> {
|
||||
|
||||
// html
|
||||
let html = format!(r#"<!DOCTYPE html><html><head><title>actix - basics</title><link rel="shortcut icon" type="image/x-icon" href="/favicon.ico" /></head>
|
||||
let html = r#"<!DOCTYPE html><html><head><title>actix - basics</title><link rel="shortcut icon" type="image/x-icon" href="/favicon.ico" /></head>
|
||||
<body>
|
||||
<a href="index.html">back to home</a>
|
||||
<h1>404</h1>
|
||||
</body>
|
||||
</html>"#);
|
||||
</html>"#;
|
||||
|
||||
// response
|
||||
Ok(HttpResponse::build(StatusCode::NOT_FOUND)
|
||||
.content_type("text/html; charset=utf-8")
|
||||
.body(&html).unwrap())
|
||||
.body(html).unwrap())
|
||||
}
|
||||
|
||||
|
||||
@ -92,8 +93,9 @@ fn with_param(req: HttpRequest) -> Result<HttpResponse>
|
||||
}
|
||||
|
||||
fn main() {
|
||||
::std::env::set_var("RUST_LOG", "actix_web=info");
|
||||
let _ = env_logger::init();
|
||||
env::set_var("RUST_LOG", "actix_web=debug");
|
||||
env::set_var("RUST_BACKTRACE", "1");
|
||||
env_logger::init();
|
||||
let sys = actix::System::new("basic-example");
|
||||
|
||||
let addr = HttpServer::new(
|
||||
@ -121,6 +123,9 @@ fn main() {
|
||||
_ => httpcodes::HTTPNotFound,
|
||||
}
|
||||
}))
|
||||
.resource("/error.html", |r| r.f(|req| {
|
||||
error::ErrorBadRequest(io::Error::new(io::ErrorKind::Other, "test"))
|
||||
}))
|
||||
// static files
|
||||
.handler("/static/", fs::StaticFiles::new("../static/", true))
|
||||
// redirect
|
||||
|
@ -5,9 +5,9 @@ authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
workspace = "../.."
|
||||
|
||||
[dependencies]
|
||||
env_logger = "0.4"
|
||||
env_logger = "0.5"
|
||||
actix = "0.4"
|
||||
actix-web = { git = "https://github.com/actix/actix-web" }
|
||||
actix-web = { path = "../../" }
|
||||
|
||||
futures = "0.1"
|
||||
uuid = { version = "0.5", features = ["serde", "v4"] }
|
||||
|
@ -8,7 +8,7 @@ use diesel::prelude::*;
|
||||
use models;
|
||||
use schema;
|
||||
|
||||
/// This is db executor actor. We are going to run 3 of them in parallele.
|
||||
/// This is db executor actor. We are going to run 3 of them in parallel.
|
||||
pub struct DbExecutor(pub SqliteConnection);
|
||||
|
||||
/// This is only message that this actor can handle, but it is easy to extend number of
|
||||
|
@ -5,6 +5,6 @@ authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
workspace = "../.."
|
||||
|
||||
[dependencies]
|
||||
env_logger = "0.4"
|
||||
env_logger = "0.5"
|
||||
actix = "0.4"
|
||||
actix-web = { path = "../../" }
|
||||
|
@ -15,4 +15,4 @@ serde_derive = "1.0"
|
||||
json = "*"
|
||||
|
||||
actix = "0.4"
|
||||
actix-web = { git = "https://github.com/actix/actix-web" }
|
||||
actix-web = { path="../../" }
|
||||
|
@ -12,4 +12,4 @@ path = "src/main.rs"
|
||||
env_logger = "*"
|
||||
futures = "0.1"
|
||||
actix = "0.4"
|
||||
actix-web = { git = "https://github.com/actix/actix-web" }
|
||||
actix-web = { path="../../" }
|
||||
|
@ -6,6 +6,6 @@ workspace = "../.."
|
||||
|
||||
[dependencies]
|
||||
futures = "*"
|
||||
env_logger = "0.4"
|
||||
env_logger = "0.5"
|
||||
actix = "0.4"
|
||||
actix-web = { git = "https://github.com/actix/actix-web" }
|
||||
actix-web = { path = "../../" }
|
||||
|
@ -1,5 +1,5 @@
|
||||
#![cfg_attr(feature="cargo-clippy", allow(needless_pass_by_value))]
|
||||
//! There are two level of statfulness in actix-web. Application has state
|
||||
//! There are two level of statefulness in actix-web. Application has state
|
||||
//! that is shared across all handlers within same Application.
|
||||
//! And individual handler can have state.
|
||||
|
||||
@ -33,7 +33,7 @@ struct MyWebSocket {
|
||||
}
|
||||
|
||||
impl Actor for MyWebSocket {
|
||||
type Context = HttpContext<Self, AppState>;
|
||||
type Context = ws::WebsocketContext<Self, AppState>;
|
||||
}
|
||||
|
||||
impl Handler<ws::Message> for MyWebSocket {
|
||||
@ -43,9 +43,9 @@ impl Handler<ws::Message> for MyWebSocket {
|
||||
self.counter += 1;
|
||||
println!("WS({}): {:?}", self.counter, msg);
|
||||
match msg {
|
||||
ws::Message::Ping(msg) => ws::WsWriter::pong(ctx, &msg),
|
||||
ws::Message::Text(text) => ws::WsWriter::text(ctx, &text),
|
||||
ws::Message::Binary(bin) => ws::WsWriter::binary(ctx, bin),
|
||||
ws::Message::Ping(msg) => ctx.pong(&msg),
|
||||
ws::Message::Text(text) => ctx.text(&text),
|
||||
ws::Message::Binary(bin) => ctx.binary(bin),
|
||||
ws::Message::Closed | ws::Message::Error => {
|
||||
ctx.stop();
|
||||
}
|
||||
|
@ -5,7 +5,7 @@ authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
workspace = "../.."
|
||||
|
||||
[dependencies]
|
||||
env_logger = "0.4"
|
||||
env_logger = "0.5"
|
||||
actix = "0.4"
|
||||
actix-web = { git = "https://github.com/actix/actix-web" }
|
||||
actix-web = { path = "../../" }
|
||||
tera = "*"
|
||||
|
@ -9,6 +9,6 @@ name = "server"
|
||||
path = "src/main.rs"
|
||||
|
||||
[dependencies]
|
||||
env_logger = "0.4"
|
||||
env_logger = "0.5"
|
||||
actix = "^0.4.2"
|
||||
actix-web = { git = "https://github.com/actix/actix-web", features=["alpn"] }
|
||||
actix-web = { path = "../../", features=["alpn"] }
|
||||
|
@ -9,7 +9,7 @@ use std::io::Read;
|
||||
use actix_web::*;
|
||||
|
||||
|
||||
/// somple handle
|
||||
/// simple handle
|
||||
fn index(req: HttpRequest) -> Result<HttpResponse> {
|
||||
println!("{:?}", req);
|
||||
Ok(httpcodes::HTTPOk
|
||||
|
@ -26,4 +26,4 @@ serde_json = "1.0"
|
||||
serde_derive = "1.0"
|
||||
|
||||
actix = "^0.4.2"
|
||||
actix-web = { git = "https://github.com/actix/actix-web" }
|
||||
actix-web = { path="../../" }
|
||||
|
@ -16,8 +16,8 @@ Chat server listens for incoming tcp connections. Server can access several type
|
||||
* `\list` - list all available rooms
|
||||
* `\join name` - join room, if room does not exist, create new one
|
||||
* `\name name` - set session name
|
||||
* `some message` - just string, send messsage to all peers in same room
|
||||
* client has to send heartbeat `Ping` messages, if server does not receive a heartbeat message for 10 seconds connection gets droppped
|
||||
* `some message` - just string, send message to all peers in same room
|
||||
* client has to send heartbeat `Ping` messages, if server does not receive a heartbeat message for 10 seconds connection gets dropped
|
||||
|
||||
To start server use command: `cargo run --bin server`
|
||||
|
||||
|
@ -16,7 +16,7 @@ use codec::{ChatRequest, ChatResponse, ChatCodec};
|
||||
#[derive(Message)]
|
||||
pub struct Message(pub String);
|
||||
|
||||
/// `ChatSession` actor is responsible for tcp peer communitions.
|
||||
/// `ChatSession` actor is responsible for tcp peer communications.
|
||||
pub struct ChatSession {
|
||||
/// unique session id
|
||||
id: usize,
|
||||
@ -30,7 +30,7 @@ pub struct ChatSession {
|
||||
|
||||
impl Actor for ChatSession {
|
||||
/// For tcp communication we are going to use `FramedContext`.
|
||||
/// It is convinient wrapper around `Framed` object from `tokio_io`
|
||||
/// It is convenient wrapper around `Framed` object from `tokio_io`
|
||||
type Context = FramedContext<Self>;
|
||||
|
||||
fn started(&mut self, ctx: &mut Self::Context) {
|
||||
@ -149,7 +149,7 @@ impl ChatSession {
|
||||
}
|
||||
|
||||
|
||||
/// Define tcp server that will accept incomint tcp connection and create
|
||||
/// Define tcp server that will accept incoming tcp connection and create
|
||||
/// chat actors.
|
||||
pub struct TcpServer {
|
||||
chat: SyncAddress<ChatServer>,
|
||||
|
@ -134,3 +134,18 @@ fn index(req: HttpRequest) -> Result<&'static str> {
|
||||
```
|
||||
|
||||
In this example *BAD REQUEST* response get generated for `MyError` error.
|
||||
|
||||
## Error logging
|
||||
|
||||
Actix logs all errors with `WARN` log level. If log level set to `DEBUG`
|
||||
and `RUST_BACKTRACE` is enabled, backtrace get logged. The Error type uses
|
||||
cause's error backtrace if available, if the underlying failure does not provide
|
||||
a backtrace, a new backtrace is constructed pointing to that conversion point
|
||||
(rather than the origin of the error). This construction only happens if there
|
||||
is no underlying backtrace; if it does have a backtrace no new backtrace is constructed.
|
||||
|
||||
You can enable backtrace and debug logging with following command:
|
||||
|
||||
```
|
||||
>> RUST_BACKTRACE=1 RUST_LOG=actix_web=debug cargo run
|
||||
```
|
||||
|
@ -59,9 +59,11 @@ impl<S: 'static> PipelineHandler<S> for Inner<S> {
|
||||
|
||||
#[cfg(test)]
|
||||
impl<S: 'static> HttpApplication<S> {
|
||||
#[cfg(test)]
|
||||
pub(crate) fn run(&mut self, req: HttpRequest<S>) -> Reply {
|
||||
self.inner.borrow_mut().handle(req)
|
||||
}
|
||||
#[cfg(test)]
|
||||
pub(crate) fn prepare_request(&self, req: HttpRequest) -> HttpRequest<S> {
|
||||
req.with_state(Rc::clone(&self.state), self.router.clone())
|
||||
}
|
||||
@ -134,7 +136,7 @@ impl<S> Application<S> where S: 'static {
|
||||
/// Create application with specific state. Application can be
|
||||
/// configured with builder-like pattern.
|
||||
///
|
||||
/// State is shared with all reousrces within same application and could be
|
||||
/// State is shared with all resources within same application and could be
|
||||
/// accessed with `HttpRequest::state()` method.
|
||||
pub fn with_state(state: S) -> Application<S> {
|
||||
Application {
|
||||
@ -154,7 +156,7 @@ impl<S> Application<S> where S: 'static {
|
||||
/// Set application prefix
|
||||
///
|
||||
/// Only requests that matches application's prefix get processed by this application.
|
||||
/// Application prefix always contains laading "/" slash. If supplied prefix
|
||||
/// Application prefix always contains leading "/" slash. If supplied prefix
|
||||
/// does not contain leading slash, it get inserted. Prefix should
|
||||
/// consists valid path segments. i.e for application with
|
||||
/// prefix `/app` any request with following paths `/app`, `/app/` or `/app/test`
|
||||
@ -356,6 +358,40 @@ impl<S> Application<S> where S: 'static {
|
||||
middlewares: Rc::new(parts.middlewares),
|
||||
}
|
||||
}
|
||||
|
||||
/// Convenience method for creating `Box<HttpHandler>` instance.
|
||||
///
|
||||
/// This method is useful if you need to register several application instances
|
||||
/// with different state.
|
||||
///
|
||||
/// ```rust
|
||||
/// # use std::thread;
|
||||
/// # extern crate actix_web;
|
||||
/// use actix_web::*;
|
||||
///
|
||||
/// struct State1;
|
||||
///
|
||||
/// struct State2;
|
||||
///
|
||||
/// fn main() {
|
||||
/// # thread::spawn(|| {
|
||||
/// HttpServer::new(|| { vec![
|
||||
/// Application::with_state(State1)
|
||||
/// .prefix("/app1")
|
||||
/// .resource("/", |r| r.h(httpcodes::HTTPOk))
|
||||
/// .boxed(),
|
||||
/// Application::with_state(State2)
|
||||
/// .prefix("/app2")
|
||||
/// .resource("/", |r| r.h(httpcodes::HTTPOk))
|
||||
/// .boxed() ]})
|
||||
/// .bind("127.0.0.1:8080").unwrap()
|
||||
/// .run()
|
||||
/// # });
|
||||
/// }
|
||||
/// ```
|
||||
pub fn boxed(mut self) -> Box<HttpHandler> {
|
||||
Box::new(self.finish())
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: 'static> IntoHttpHandler for Application<S> {
|
||||
|
20
src/body.rs
20
src/body.rs
@ -1,4 +1,4 @@
|
||||
use std::fmt;
|
||||
use std::{fmt, mem};
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
@ -31,7 +31,7 @@ pub enum Binary {
|
||||
Bytes(Bytes),
|
||||
/// Static slice
|
||||
Slice(&'static [u8]),
|
||||
/// Shared stirng body
|
||||
/// Shared string body
|
||||
SharedString(Rc<String>),
|
||||
/// Shared string body
|
||||
#[doc(hidden)]
|
||||
@ -122,6 +122,22 @@ impl Binary {
|
||||
pub fn from_slice(s: &[u8]) -> Binary {
|
||||
Binary::Bytes(Bytes::from(s))
|
||||
}
|
||||
|
||||
/// Convert Binary to a Bytes instance
|
||||
pub fn take(&mut self) -> Bytes {
|
||||
mem::replace(self, Binary::Slice(b"")).into()
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for Binary {
|
||||
fn clone(&self) -> Binary {
|
||||
match *self {
|
||||
Binary::Bytes(ref bytes) => Binary::Bytes(bytes.clone()),
|
||||
Binary::Slice(slice) => Binary::Bytes(Bytes::from(slice)),
|
||||
Binary::SharedString(ref s) => Binary::Bytes(Bytes::from(s.as_str())),
|
||||
Binary::ArcSharedString(ref s) => Binary::Bytes(Bytes::from(s.as_str())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<Bytes> for Binary {
|
||||
|
@ -127,7 +127,7 @@ impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Indicate end of streamimng payload. Also this method calls `Self::close`.
|
||||
/// Indicate end of streaming payload. Also this method calls `Self::close`.
|
||||
#[inline]
|
||||
pub fn write_eof(&mut self) {
|
||||
self.add_frame(Frame::Chunk(None));
|
||||
|
220
src/error.rs
220
src/error.rs
@ -9,8 +9,8 @@ use std::error::Error as StdError;
|
||||
|
||||
use cookie;
|
||||
use httparse;
|
||||
use failure::Fail;
|
||||
use futures::Canceled;
|
||||
use failure::{Fail, Backtrace};
|
||||
use http2::Error as Http2Error;
|
||||
use http::{header, StatusCode, Error as HttpError};
|
||||
use http::uri::InvalidUriBytes;
|
||||
@ -22,6 +22,8 @@ use url::ParseError as UrlParseError;
|
||||
pub use cookie::{ParseError as CookieParseError};
|
||||
|
||||
use body::Body;
|
||||
use handler::Responder;
|
||||
use httprequest::HttpRequest;
|
||||
use httpresponse::HttpResponse;
|
||||
use httpcodes::{HTTPBadRequest, HTTPMethodNotAllowed, HTTPExpectationFailed};
|
||||
|
||||
@ -33,9 +35,9 @@ use httpcodes::{HTTPBadRequest, HTTPMethodNotAllowed, HTTPExpectationFailed};
|
||||
pub type Result<T, E=Error> = result::Result<T, E>;
|
||||
|
||||
/// General purpose actix web error
|
||||
#[derive(Fail, Debug)]
|
||||
pub struct Error {
|
||||
cause: Box<ResponseError>,
|
||||
backtrace: Option<Backtrace>,
|
||||
}
|
||||
|
||||
impl Error {
|
||||
@ -64,6 +66,16 @@ impl fmt::Display for Error {
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Error {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
if let Some(bt) = self.cause.backtrace() {
|
||||
write!(f, "{:?}\n\n{:?}", &self.cause, bt)
|
||||
} else {
|
||||
write!(f, "{:?}\n\n{:?}", &self.cause, self.backtrace.as_ref().unwrap())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// `HttpResponse` for `Error`
|
||||
impl From<Error> for HttpResponse {
|
||||
fn from(err: Error) -> Self {
|
||||
@ -74,7 +86,12 @@ impl From<Error> for HttpResponse {
|
||||
/// `Error` for any error that implements `ResponseError`
|
||||
impl<T: ResponseError> From<T> for Error {
|
||||
fn from(err: T) -> Error {
|
||||
Error { cause: Box::new(err) }
|
||||
let backtrace = if err.backtrace().is_none() {
|
||||
Some(Backtrace::new())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Error { cause: Box::new(err), backtrace: backtrace }
|
||||
}
|
||||
}
|
||||
|
||||
@ -320,7 +337,7 @@ pub enum WsHandshakeError {
|
||||
/// Only get method is allowed
|
||||
#[fail(display="Method not allowed")]
|
||||
GetMethodRequired,
|
||||
/// Ugrade header if not set to websocket
|
||||
/// Upgrade header if not set to websocket
|
||||
#[fail(display="Websocket upgrade is expected")]
|
||||
NoWebsocketUpgrade,
|
||||
/// Connection header is not set to upgrade
|
||||
@ -329,7 +346,7 @@ pub enum WsHandshakeError {
|
||||
/// Websocket version header is not set
|
||||
#[fail(display="Websocket version header is required")]
|
||||
NoVersionHeader,
|
||||
/// Unsupported websockt version
|
||||
/// Unsupported websocket version
|
||||
#[fail(display="Unsupported version")]
|
||||
UnsupportedVersion,
|
||||
/// Websocket key is not set or wrong
|
||||
@ -478,39 +495,10 @@ impl From<UrlParseError> for UrlGenerationError {
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! ERROR_WRAP {
|
||||
($type:ty, $status:expr) => {
|
||||
unsafe impl<T> Sync for $type {}
|
||||
unsafe impl<T> Send for $type {}
|
||||
|
||||
impl<T> $type {
|
||||
pub fn cause(&self) -> &T {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: fmt::Debug + 'static> Fail for $type {}
|
||||
impl<T: fmt::Debug + 'static> fmt::Display for $type {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{:?}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> ResponseError for $type
|
||||
where T: Send + Sync + fmt::Debug + 'static,
|
||||
{
|
||||
fn error_response(&self) -> HttpResponse {
|
||||
HttpResponse::new($status, Body::Empty)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper type that can wrap any error and generate *BAD REQUEST* response.
|
||||
/// Helper type that can wrap any error and generate custom response.
|
||||
///
|
||||
/// In following example any `io::Error` will be converted into "BAD REQUEST" response
|
||||
/// as oposite to *INNTERNAL SERVER ERROR* which is defined by default.
|
||||
/// as opposite to *INNTERNAL SERVER ERROR* which is defined by default.
|
||||
///
|
||||
/// ```rust
|
||||
/// # extern crate actix_web;
|
||||
@ -523,59 +511,133 @@ macro_rules! ERROR_WRAP {
|
||||
/// }
|
||||
/// # fn main() {}
|
||||
/// ```
|
||||
#[derive(Debug)]
|
||||
pub struct ErrorBadRequest<T>(pub T);
|
||||
ERROR_WRAP!(ErrorBadRequest<T>, StatusCode::BAD_REQUEST);
|
||||
pub struct InternalError<T> {
|
||||
cause: T,
|
||||
status: StatusCode,
|
||||
backtrace: Backtrace,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Helper type that can wrap any error and generate *UNAUTHORIZED* response.
|
||||
pub struct ErrorUnauthorized<T>(pub T);
|
||||
ERROR_WRAP!(ErrorUnauthorized<T>, StatusCode::UNAUTHORIZED);
|
||||
unsafe impl<T> Sync for InternalError<T> {}
|
||||
unsafe impl<T> Send for InternalError<T> {}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Helper type that can wrap any error and generate *FORBIDDEN* response.
|
||||
pub struct ErrorForbidden<T>(pub T);
|
||||
ERROR_WRAP!(ErrorForbidden<T>, StatusCode::FORBIDDEN);
|
||||
impl<T> InternalError<T> {
|
||||
pub fn new(err: T, status: StatusCode) -> Self {
|
||||
InternalError {
|
||||
cause: err,
|
||||
status: status,
|
||||
backtrace: Backtrace::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Helper type that can wrap any error and generate *NOT FOUND* response.
|
||||
pub struct ErrorNotFound<T>(pub T);
|
||||
ERROR_WRAP!(ErrorNotFound<T>, StatusCode::NOT_FOUND);
|
||||
impl<T> Fail for InternalError<T>
|
||||
where T: Send + Sync + fmt::Debug + 'static
|
||||
{
|
||||
fn backtrace(&self) -> Option<&Backtrace> {
|
||||
Some(&self.backtrace)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Helper type that can wrap any error and generate *METHOD NOT ALLOWED* response.
|
||||
pub struct ErrorMethodNotAllowed<T>(pub T);
|
||||
ERROR_WRAP!(ErrorMethodNotAllowed<T>, StatusCode::METHOD_NOT_ALLOWED);
|
||||
impl<T> fmt::Debug for InternalError<T>
|
||||
where T: Send + Sync + fmt::Debug + 'static
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
fmt::Debug::fmt(&self.cause, f)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Helper type that can wrap any error and generate *REQUEST TIMEOUT* response.
|
||||
pub struct ErrorRequestTimeout<T>(pub T);
|
||||
ERROR_WRAP!(ErrorRequestTimeout<T>, StatusCode::REQUEST_TIMEOUT);
|
||||
impl<T> fmt::Display for InternalError<T>
|
||||
where T: Send + Sync + fmt::Debug + 'static
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
fmt::Debug::fmt(&self.cause, f)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Helper type that can wrap any error and generate *CONFLICT* response.
|
||||
pub struct ErrorConflict<T>(pub T);
|
||||
ERROR_WRAP!(ErrorConflict<T>, StatusCode::CONFLICT);
|
||||
impl<T> ResponseError for InternalError<T>
|
||||
where T: Send + Sync + fmt::Debug + 'static
|
||||
{
|
||||
fn error_response(&self) -> HttpResponse {
|
||||
HttpResponse::new(self.status, Body::Empty)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Helper type that can wrap any error and generate *GONE* response.
|
||||
pub struct ErrorGone<T>(pub T);
|
||||
ERROR_WRAP!(ErrorGone<T>, StatusCode::GONE);
|
||||
impl<T> Responder for InternalError<T>
|
||||
where T: Send + Sync + fmt::Debug + 'static
|
||||
{
|
||||
type Item = HttpResponse;
|
||||
type Error = Error;
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Helper type that can wrap any error and generate *PRECONDITION FAILED* response.
|
||||
pub struct ErrorPreconditionFailed<T>(pub T);
|
||||
ERROR_WRAP!(ErrorPreconditionFailed<T>, StatusCode::PRECONDITION_FAILED);
|
||||
fn respond_to(self, _: HttpRequest) -> Result<HttpResponse, Error> {
|
||||
Err(self.into())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Helper type that can wrap any error and generate *EXPECTATION FAILED* response.
|
||||
pub struct ErrorExpectationFailed<T>(pub T);
|
||||
ERROR_WRAP!(ErrorExpectationFailed<T>, StatusCode::EXPECTATION_FAILED);
|
||||
/// Helper function that creates wrapper of any error and generate *BAD REQUEST* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorBadRequest<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::BAD_REQUEST)
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Helper type that can wrap any error and generate *INTERNAL SERVER ERROR* response.
|
||||
pub struct ErrorInternalServerError<T>(pub T);
|
||||
ERROR_WRAP!(ErrorInternalServerError<T>, StatusCode::INTERNAL_SERVER_ERROR);
|
||||
/// Helper function that creates wrapper of any error and generate *UNAUTHORIZED* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorUnauthorized<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::UNAUTHORIZED)
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *FORBIDDEN* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorForbidden<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::FORBIDDEN)
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *NOT FOUND* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorNotFound<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::NOT_FOUND)
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *METHOD NOT ALLOWED* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorMethodNotAllowed<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::METHOD_NOT_ALLOWED)
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *REQUEST TIMEOUT* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorRequestTimeout<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::REQUEST_TIMEOUT)
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *CONFLICT* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorConflict<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::CONFLICT)
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *GONE* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorGone<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::GONE)
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *PRECONDITION FAILED* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorPreconditionFailed<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::PRECONDITION_FAILED)
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *EXPECTATION FAILED* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorExpectationFailed<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::EXPECTATION_FAILED)
|
||||
}
|
||||
|
||||
/// Helper function that creates wrapper of any error and generate *INTERNAL SERVER ERROR* response.
|
||||
#[allow(non_snake_case)]
|
||||
pub fn ErrorInternalServerError<T>(err: T) -> InternalError<T> {
|
||||
InternalError::new(err, StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
@ -9,7 +9,7 @@ use error::Error;
|
||||
use httprequest::HttpRequest;
|
||||
use httpresponse::HttpResponse;
|
||||
|
||||
/// Trait defines object that could be regestered as route handler
|
||||
/// Trait defines object that could be registered as route handler
|
||||
#[allow(unused_variables)]
|
||||
pub trait Handler<S>: 'static {
|
||||
|
||||
@ -35,7 +35,7 @@ pub trait Responder {
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
/// Convinience trait that convert `Future` object into `Boxed` future
|
||||
/// Convenience trait that convert `Future` object into `Boxed` future
|
||||
pub trait AsyncResponder<I, E>: Sized {
|
||||
fn responder(self) -> Box<Future<Item=I, Error=E>>;
|
||||
}
|
||||
@ -193,7 +193,7 @@ impl<I, E> Responder for Box<Future<Item=I, Error=E>>
|
||||
}
|
||||
}
|
||||
|
||||
/// Trait defines object that could be regestered as resource route
|
||||
/// Trait defines object that could be registered as resource route
|
||||
pub(crate) trait RouteHandler<S>: 'static {
|
||||
fn handle(&mut self, req: HttpRequest<S>) -> Reply;
|
||||
}
|
||||
@ -341,7 +341,7 @@ impl Default for NormalizePath {
|
||||
}
|
||||
|
||||
impl NormalizePath {
|
||||
/// Create new `NoramlizePath` instance
|
||||
/// Create new `NormalizePath` instance
|
||||
pub fn new(append: bool, merge: bool, redirect: StatusCode) -> NormalizePath {
|
||||
NormalizePath {
|
||||
append: append,
|
||||
|
@ -66,84 +66,6 @@ impl fmt::Write for CachedDate {
|
||||
}
|
||||
}
|
||||
|
||||
/// Internal use only! unsafe
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct SharedBytesPool(RefCell<VecDeque<Rc<BytesMut>>>);
|
||||
|
||||
impl SharedBytesPool {
|
||||
pub fn new() -> SharedBytesPool {
|
||||
SharedBytesPool(RefCell::new(VecDeque::with_capacity(128)))
|
||||
}
|
||||
|
||||
pub fn get_bytes(&self) -> Rc<BytesMut> {
|
||||
if let Some(bytes) = self.0.borrow_mut().pop_front() {
|
||||
bytes
|
||||
} else {
|
||||
Rc::new(BytesMut::new())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn release_bytes(&self, mut bytes: Rc<BytesMut>) {
|
||||
let v = &mut self.0.borrow_mut();
|
||||
if v.len() < 128 {
|
||||
Rc::get_mut(&mut bytes).unwrap().take();
|
||||
v.push_front(bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct SharedBytes(
|
||||
Option<Rc<BytesMut>>, Option<Rc<SharedBytesPool>>);
|
||||
|
||||
impl Drop for SharedBytes {
|
||||
fn drop(&mut self) {
|
||||
if let Some(ref pool) = self.1 {
|
||||
if let Some(bytes) = self.0.take() {
|
||||
if Rc::strong_count(&bytes) == 1 {
|
||||
pool.release_bytes(bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SharedBytes {
|
||||
|
||||
pub fn empty() -> Self {
|
||||
SharedBytes(None, None)
|
||||
}
|
||||
|
||||
pub fn new(bytes: Rc<BytesMut>, pool: Rc<SharedBytesPool>) -> SharedBytes {
|
||||
SharedBytes(Some(bytes), Some(pool))
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
#[allow(mutable_transmutes)]
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))]
|
||||
pub fn get_mut(&self) -> &mut BytesMut {
|
||||
let r: &BytesMut = self.0.as_ref().unwrap().as_ref();
|
||||
unsafe{mem::transmute(r)}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn get_ref(&self) -> &BytesMut {
|
||||
self.0.as_ref().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for SharedBytes {
|
||||
fn default() -> Self {
|
||||
SharedBytes(Some(Rc::new(BytesMut::new())), None)
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for SharedBytes {
|
||||
fn clone(&self) -> SharedBytes {
|
||||
SharedBytes(self.0.clone(), self.1.clone())
|
||||
}
|
||||
}
|
||||
|
||||
/// Internal use only! unsafe
|
||||
pub(crate) struct SharedMessagePool(RefCell<VecDeque<Rc<HttpMessage>>>);
|
||||
|
||||
|
@ -222,7 +222,7 @@ impl<S> HttpRequest<S> {
|
||||
self.uri().path()
|
||||
}
|
||||
|
||||
/// Get *ConnectionInfo* for currect request.
|
||||
/// Get *ConnectionInfo* for correct request.
|
||||
pub fn connection_info(&self) -> &ConnectionInfo {
|
||||
if self.as_ref().info.is_none() {
|
||||
let info: ConnectionInfo<'static> = unsafe{
|
||||
@ -278,7 +278,7 @@ impl<S> HttpRequest<S> {
|
||||
|
||||
/// Peer socket address
|
||||
///
|
||||
/// Peer address is actuall socket address, if proxy is used in front of
|
||||
/// Peer address is actual socket address, if proxy is used in front of
|
||||
/// actix http server, then peer address would be address of this proxy.
|
||||
///
|
||||
/// To get client connection information `connection_info()` method should be used.
|
||||
|
@ -164,8 +164,8 @@ impl HttpResponse {
|
||||
|
||||
/// Content encoding
|
||||
#[inline]
|
||||
pub fn content_encoding(&self) -> &ContentEncoding {
|
||||
&self.get_ref().encoding
|
||||
pub fn content_encoding(&self) -> ContentEncoding {
|
||||
self.get_ref().encoding
|
||||
}
|
||||
|
||||
/// Set content encoding
|
||||
@ -423,8 +423,8 @@ impl HttpResponseBuilder {
|
||||
}
|
||||
|
||||
/// This method calls provided closure with builder reference if value is Some.
|
||||
pub fn if_some<T, F>(&mut self, value: Option<&T>, f: F) -> &mut Self
|
||||
where F: FnOnce(&T, &mut HttpResponseBuilder)
|
||||
pub fn if_some<T, F>(&mut self, value: Option<T>, f: F) -> &mut Self
|
||||
where F: FnOnce(T, &mut HttpResponseBuilder)
|
||||
{
|
||||
if let Some(val) = value {
|
||||
f(val, self);
|
||||
@ -812,11 +812,11 @@ mod tests {
|
||||
#[test]
|
||||
fn test_content_encoding() {
|
||||
let resp = HttpResponse::build(StatusCode::OK).finish().unwrap();
|
||||
assert_eq!(*resp.content_encoding(), ContentEncoding::Auto);
|
||||
assert_eq!(resp.content_encoding(), ContentEncoding::Auto);
|
||||
|
||||
let resp = HttpResponse::build(StatusCode::OK)
|
||||
.content_encoding(ContentEncoding::Br).finish().unwrap();
|
||||
assert_eq!(*resp.content_encoding(), ContentEncoding::Br);
|
||||
assert_eq!(resp.content_encoding(), ContentEncoding::Br);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -214,7 +214,7 @@ impl Cors {
|
||||
/// This method register cors middleware with resource and
|
||||
/// adds route for *OPTIONS* preflight requests.
|
||||
///
|
||||
/// It is possible to register *Cors* middlware with `Resource::middleware()`
|
||||
/// It is possible to register *Cors* middleware with `Resource::middleware()`
|
||||
/// method, but in that case *Cors* middleware wont be able to handle *OPTIONS*
|
||||
/// requests.
|
||||
pub fn register<S: 'static>(self, resource: &mut Resource<S>) {
|
||||
@ -295,16 +295,23 @@ impl<S> Middleware<S> for Cors {
|
||||
self.validate_allowed_method(req)?;
|
||||
self.validate_allowed_headers(req)?;
|
||||
|
||||
// allowed headers
|
||||
let headers = if let Some(headers) = self.headers.as_ref() {
|
||||
Some(HeaderValue::try_from(&headers.iter().fold(
|
||||
String::new(), |s, v| s + "," + v.as_str()).as_str()[1..]).unwrap())
|
||||
} else if let Some(hdr) = req.headers().get(header::ACCESS_CONTROL_REQUEST_HEADERS) {
|
||||
Some(hdr.clone())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(Started::Response(
|
||||
HTTPOk.build()
|
||||
.if_some(self.max_age.as_ref(), |max_age, resp| {
|
||||
let _ = resp.header(
|
||||
header::ACCESS_CONTROL_MAX_AGE, format!("{}", max_age).as_str());})
|
||||
.if_some(self.headers.as_ref(), |headers, resp| {
|
||||
let _ = resp.header(
|
||||
header::ACCESS_CONTROL_ALLOW_HEADERS,
|
||||
&headers.iter().fold(
|
||||
String::new(), |s, v| s + "," + v.as_str()).as_str()[1..]);})
|
||||
.if_some(headers, |headers, resp| {
|
||||
let _ = resp.header(header::ACCESS_CONTROL_ALLOW_HEADERS, headers); })
|
||||
.if_true(self.origins.is_all(), |resp| {
|
||||
if self.send_wildcard {
|
||||
resp.header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*");
|
||||
|
@ -217,7 +217,7 @@ pub struct CookieSession {
|
||||
inner: Rc<CookieSessionInner>,
|
||||
}
|
||||
|
||||
/// Errors that can occure during handling cookie session
|
||||
/// Errors that can occur during handling cookie session
|
||||
#[derive(Fail, Debug)]
|
||||
pub enum CookieSessionError {
|
||||
/// Size of the serialized session is greater than 4000 bytes.
|
||||
|
@ -6,7 +6,7 @@ use std::slice::Iter;
|
||||
use std::borrow::Cow;
|
||||
use smallvec::SmallVec;
|
||||
|
||||
use error::{ResponseError, UriSegmentError, ErrorBadRequest};
|
||||
use error::{ResponseError, UriSegmentError, InternalError, ErrorBadRequest};
|
||||
|
||||
|
||||
/// A trait to abstract the idea of creating a new instance of a type from a path parameter.
|
||||
@ -77,7 +77,7 @@ impl<'a> Params<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Return iterator to items in paramter container
|
||||
/// Return iterator to items in parameter container
|
||||
pub fn iter(&self) -> Iter<(Cow<'a, str>, Cow<'a, str>)> {
|
||||
self.0.iter()
|
||||
}
|
||||
@ -141,7 +141,7 @@ impl FromParam for PathBuf {
|
||||
macro_rules! FROM_STR {
|
||||
($type:ty) => {
|
||||
impl FromParam for $type {
|
||||
type Err = ErrorBadRequest<<$type as FromStr>::Err>;
|
||||
type Err = InternalError<<$type as FromStr>::Err>;
|
||||
|
||||
fn from_param(val: &str) -> Result<Self, Self::Err> {
|
||||
<$type as FromStr>::from_str(val).map_err(ErrorBadRequest)
|
||||
|
@ -3,6 +3,7 @@ use std::rc::Rc;
|
||||
use std::cell::RefCell;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
use log::Level::Debug;
|
||||
use futures::{Async, Poll, Future, Stream};
|
||||
use futures::unsync::oneshot;
|
||||
|
||||
@ -56,7 +57,7 @@ impl<S: 'static, H: PipelineHandler<S>> PipelineState<S, H> {
|
||||
|
||||
struct PipelineInfo<S> {
|
||||
req: HttpRequest<S>,
|
||||
count: usize,
|
||||
count: u16,
|
||||
mws: Rc<Vec<Box<Middleware<S>>>>,
|
||||
context: Option<Box<ActorHttpContext>>,
|
||||
error: Option<Error>,
|
||||
@ -210,14 +211,14 @@ impl<S: 'static, H: PipelineHandler<S>> StartMiddlewares<S, H> {
|
||||
|
||||
fn init(info: &mut PipelineInfo<S>, handler: Rc<RefCell<H>>) -> PipelineState<S, H> {
|
||||
// execute middlewares, we need this stage because middlewares could be non-async
|
||||
// and we can move to next state immidietly
|
||||
let len = info.mws.len();
|
||||
// and we can move to next state immediately
|
||||
let len = info.mws.len() as u16;
|
||||
loop {
|
||||
if info.count == len {
|
||||
let reply = handler.borrow_mut().handle(info.req.clone());
|
||||
return WaitingResponse::init(info, reply)
|
||||
} else {
|
||||
match info.mws[info.count].start(&mut info.req) {
|
||||
match info.mws[info.count as usize].start(&mut info.req) {
|
||||
Ok(Started::Done) =>
|
||||
info.count += 1,
|
||||
Ok(Started::Response(resp)) =>
|
||||
@ -246,7 +247,7 @@ impl<S: 'static, H: PipelineHandler<S>> StartMiddlewares<S, H> {
|
||||
}
|
||||
|
||||
fn poll(&mut self, info: &mut PipelineInfo<S>) -> Option<PipelineState<S, H>> {
|
||||
let len = info.mws.len();
|
||||
let len = info.mws.len() as u16;
|
||||
'outer: loop {
|
||||
match self.fut.as_mut().unwrap().poll() {
|
||||
Ok(Async::NotReady) => return None,
|
||||
@ -260,7 +261,7 @@ impl<S: 'static, H: PipelineHandler<S>> StartMiddlewares<S, H> {
|
||||
return Some(WaitingResponse::init(info, reply));
|
||||
} else {
|
||||
loop {
|
||||
match info.mws[info.count].start(info.req_mut()) {
|
||||
match info.mws[info.count as usize].start(info.req_mut()) {
|
||||
Ok(Started::Done) =>
|
||||
info.count += 1,
|
||||
Ok(Started::Response(resp)) => {
|
||||
@ -334,7 +335,7 @@ impl<S: 'static, H> RunMiddlewares<S, H> {
|
||||
loop {
|
||||
resp = match info.mws[curr].response(info.req_mut(), resp) {
|
||||
Err(err) => {
|
||||
info.count = curr + 1;
|
||||
info.count = (curr + 1) as u16;
|
||||
return ProcessResponse::init(err.into())
|
||||
}
|
||||
Ok(Response::Done(r)) => {
|
||||
@ -458,6 +459,13 @@ impl<S: 'static, H> ProcessResponse<S, H> {
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(err) = self.resp.error() {
|
||||
warn!("Error occured during request handling: {}", err);
|
||||
if log_enabled!(Debug) {
|
||||
debug!("{:?}", err);
|
||||
}
|
||||
}
|
||||
|
||||
match self.resp.replace_body(Body::Empty) {
|
||||
Body::Streaming(stream) =>
|
||||
self.iostate = IOState::Payload(stream),
|
||||
@ -480,7 +488,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
|
||||
},
|
||||
Ok(Async::Ready(Some(chunk))) => {
|
||||
self.iostate = IOState::Payload(body);
|
||||
match io.write(chunk.as_ref()) {
|
||||
match io.write(chunk.into()) {
|
||||
Err(err) => {
|
||||
info.error = Some(err.into());
|
||||
return Ok(FinishingMiddlewares::init(info, self.resp))
|
||||
@ -522,7 +530,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
|
||||
break 'outter
|
||||
},
|
||||
Frame::Chunk(Some(chunk)) => {
|
||||
match io.write(chunk.as_ref()) {
|
||||
match io.write(chunk) {
|
||||
Err(err) => {
|
||||
info.error = Some(err.into());
|
||||
return Ok(
|
||||
@ -575,16 +583,6 @@ impl<S: 'static, H> ProcessResponse<S, H> {
|
||||
if self.running == RunningState::Paused || self.drain.is_some() {
|
||||
match io.poll_completed(false) {
|
||||
Ok(Async::Ready(_)) => {
|
||||
match io.flush() {
|
||||
Ok(Async::Ready(_)) => (),
|
||||
Ok(Async::NotReady) => return Err(PipelineState::Response(self)),
|
||||
Err(err) => {
|
||||
debug!("Error sending data: {}", err);
|
||||
info.error = Some(err.into());
|
||||
return Ok(FinishingMiddlewares::init(info, self.resp))
|
||||
}
|
||||
}
|
||||
|
||||
self.running.resume();
|
||||
|
||||
// resolve drain futures
|
||||
@ -596,7 +594,6 @@ impl<S: 'static, H> ProcessResponse<S, H> {
|
||||
},
|
||||
Ok(Async::NotReady) => return Err(PipelineState::Response(self)),
|
||||
Err(err) => {
|
||||
debug!("Error sending data: {}", err);
|
||||
info.error = Some(err.into());
|
||||
return Ok(FinishingMiddlewares::init(info, self.resp))
|
||||
}
|
||||
@ -609,7 +606,6 @@ impl<S: 'static, H> ProcessResponse<S, H> {
|
||||
match io.write_eof() {
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
debug!("Error sending data: {}", err);
|
||||
info.error = Some(err.into());
|
||||
return Ok(FinishingMiddlewares::init(info, self.resp))
|
||||
}
|
||||
@ -671,7 +667,7 @@ impl<S: 'static, H> FinishingMiddlewares<S, H> {
|
||||
self.fut = None;
|
||||
info.count -= 1;
|
||||
|
||||
match info.mws[info.count].finish(info.req_mut(), &self.resp) {
|
||||
match info.mws[info.count as usize].finish(info.req_mut(), &self.resp) {
|
||||
Finished::Done => {
|
||||
if info.count == 0 {
|
||||
return Some(Completed::init(info))
|
||||
@ -692,6 +688,10 @@ impl<S, H> Completed<S, H> {
|
||||
|
||||
#[inline]
|
||||
fn init(info: &mut PipelineInfo<S>) -> PipelineState<S, H> {
|
||||
if let Some(ref err) = info.error {
|
||||
error!("Error occured during request handling: {}", err);
|
||||
}
|
||||
|
||||
if info.context.is_none() {
|
||||
PipelineState::None
|
||||
} else {
|
||||
|
@ -19,7 +19,7 @@ use httpresponse::HttpResponse;
|
||||
/// Route uses builder-like pattern for configuration.
|
||||
/// During request handling, resource object iterate through all routes
|
||||
/// and check all predicates for specific route, if request matches all predicates route
|
||||
/// route considired matched and route handler get called.
|
||||
/// route considered matched and route handler get called.
|
||||
///
|
||||
/// ```rust
|
||||
/// # extern crate actix_web;
|
||||
|
@ -3,7 +3,7 @@ use std::io::{Read, Write};
|
||||
use std::fmt::Write as FmtWrite;
|
||||
use std::str::FromStr;
|
||||
|
||||
use http::Version;
|
||||
use http::{Version, Method, HttpTryFrom};
|
||||
use http::header::{HeaderMap, HeaderValue,
|
||||
ACCEPT_ENCODING, CONNECTION,
|
||||
CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING};
|
||||
@ -16,11 +16,13 @@ use bytes::{Bytes, BytesMut, BufMut, Writer};
|
||||
use headers::ContentEncoding;
|
||||
use body::{Body, Binary};
|
||||
use error::PayloadError;
|
||||
use helpers::SharedBytes;
|
||||
use httprequest::HttpMessage;
|
||||
use httpresponse::HttpResponse;
|
||||
use payload::{PayloadSender, PayloadWriter};
|
||||
|
||||
use super::shared::SharedBytes;
|
||||
|
||||
|
||||
impl ContentEncoding {
|
||||
|
||||
#[inline]
|
||||
@ -344,15 +346,17 @@ impl PayloadEncoder {
|
||||
pub fn new(buf: SharedBytes, req: &HttpMessage, resp: &mut HttpResponse) -> PayloadEncoder {
|
||||
let version = resp.version().unwrap_or_else(|| req.version);
|
||||
let mut body = resp.replace_body(Body::Empty);
|
||||
let response_encoding = resp.content_encoding();
|
||||
let has_body = match body {
|
||||
Body::Empty => false,
|
||||
Body::Binary(ref bin) => bin.len() >= 512,
|
||||
Body::Binary(ref bin) =>
|
||||
!(response_encoding == ContentEncoding::Auto && bin.len() < 96),
|
||||
_ => true,
|
||||
};
|
||||
|
||||
// Enable content encoding only if response does not contain Content-Encoding header
|
||||
let mut encoding = if has_body {
|
||||
let encoding = match *resp.content_encoding() {
|
||||
let encoding = match response_encoding {
|
||||
ContentEncoding::Auto => {
|
||||
// negotiate content-encoding
|
||||
if let Some(val) = req.headers.get(ACCEPT_ENCODING) {
|
||||
@ -376,10 +380,12 @@ impl PayloadEncoder {
|
||||
ContentEncoding::Identity
|
||||
};
|
||||
|
||||
let transfer = match body {
|
||||
let mut transfer = match body {
|
||||
Body::Empty => {
|
||||
resp.headers_mut().remove(CONTENT_LENGTH);
|
||||
TransferEncoding::eof(buf)
|
||||
if req.method != Method::HEAD {
|
||||
resp.headers_mut().remove(CONTENT_LENGTH);
|
||||
}
|
||||
TransferEncoding::length(0, buf)
|
||||
},
|
||||
Body::Binary(ref mut bytes) => {
|
||||
if encoding.is_compression() {
|
||||
@ -396,13 +402,20 @@ impl PayloadEncoder {
|
||||
ContentEncoding::Auto => unreachable!()
|
||||
};
|
||||
// TODO return error!
|
||||
let _ = enc.write(bytes.as_ref());
|
||||
let _ = enc.write(bytes.clone());
|
||||
let _ = enc.write_eof();
|
||||
|
||||
*bytes = Binary::from(tmp.get_mut().take());
|
||||
*bytes = Binary::from(tmp.take());
|
||||
encoding = ContentEncoding::Identity;
|
||||
}
|
||||
resp.headers_mut().remove(CONTENT_LENGTH);
|
||||
if req.method == Method::HEAD {
|
||||
let mut b = BytesMut::new();
|
||||
let _ = write!(b, "{}", bytes.len());
|
||||
resp.headers_mut().insert(
|
||||
CONTENT_LENGTH, HeaderValue::try_from(b.freeze()).unwrap());
|
||||
} else {
|
||||
resp.headers_mut().remove(CONTENT_LENGTH);
|
||||
}
|
||||
TransferEncoding::eof(buf)
|
||||
}
|
||||
Body::Streaming(_) | Body::Actor(_) => {
|
||||
@ -423,7 +436,12 @@ impl PayloadEncoder {
|
||||
}
|
||||
}
|
||||
};
|
||||
resp.replace_body(body);
|
||||
//
|
||||
if req.method == Method::HEAD {
|
||||
transfer.kind = TransferEncodingKind::Length(0);
|
||||
} else {
|
||||
resp.replace_body(body);
|
||||
}
|
||||
|
||||
PayloadEncoder(
|
||||
match encoding {
|
||||
@ -503,16 +521,6 @@ impl PayloadEncoder {
|
||||
|
||||
impl PayloadEncoder {
|
||||
|
||||
#[inline]
|
||||
pub fn len(&self) -> usize {
|
||||
self.0.get_ref().len()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn get_mut(&mut self) -> &mut BytesMut {
|
||||
self.0.get_mut()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn is_eof(&self) -> bool {
|
||||
self.0.is_eof()
|
||||
@ -520,7 +528,7 @@ impl PayloadEncoder {
|
||||
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(inline_always))]
|
||||
#[inline(always)]
|
||||
pub fn write(&mut self, payload: &[u8]) -> Result<(), io::Error> {
|
||||
pub fn write(&mut self, payload: Binary) -> Result<(), io::Error> {
|
||||
self.0.write(payload)
|
||||
}
|
||||
|
||||
@ -543,42 +551,10 @@ impl ContentEncoder {
|
||||
#[inline]
|
||||
pub fn is_eof(&self) -> bool {
|
||||
match *self {
|
||||
ContentEncoder::Br(ref encoder) =>
|
||||
encoder.get_ref().is_eof(),
|
||||
ContentEncoder::Deflate(ref encoder) =>
|
||||
encoder.get_ref().is_eof(),
|
||||
ContentEncoder::Gzip(ref encoder) =>
|
||||
encoder.get_ref().is_eof(),
|
||||
ContentEncoder::Identity(ref encoder) =>
|
||||
encoder.is_eof(),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn get_ref(&self) -> &BytesMut {
|
||||
match *self {
|
||||
ContentEncoder::Br(ref encoder) =>
|
||||
encoder.get_ref().buffer.get_ref(),
|
||||
ContentEncoder::Deflate(ref encoder) =>
|
||||
encoder.get_ref().buffer.get_ref(),
|
||||
ContentEncoder::Gzip(ref encoder) =>
|
||||
encoder.get_ref().buffer.get_ref(),
|
||||
ContentEncoder::Identity(ref encoder) =>
|
||||
encoder.buffer.get_ref(),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn get_mut(&mut self) -> &mut BytesMut {
|
||||
match *self {
|
||||
ContentEncoder::Br(ref mut encoder) =>
|
||||
encoder.get_mut().buffer.get_mut(),
|
||||
ContentEncoder::Deflate(ref mut encoder) =>
|
||||
encoder.get_mut().buffer.get_mut(),
|
||||
ContentEncoder::Gzip(ref mut encoder) =>
|
||||
encoder.get_mut().buffer.get_mut(),
|
||||
ContentEncoder::Identity(ref mut encoder) =>
|
||||
encoder.buffer.get_mut(),
|
||||
ContentEncoder::Br(ref encoder) => encoder.get_ref().is_eof(),
|
||||
ContentEncoder::Deflate(ref encoder) => encoder.get_ref().is_eof(),
|
||||
ContentEncoder::Gzip(ref encoder) => encoder.get_ref().is_eof(),
|
||||
ContentEncoder::Identity(ref encoder) => encoder.is_eof(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -629,10 +605,10 @@ impl ContentEncoder {
|
||||
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(inline_always))]
|
||||
#[inline(always)]
|
||||
pub fn write(&mut self, data: &[u8]) -> Result<(), io::Error> {
|
||||
pub fn write(&mut self, data: Binary) -> Result<(), io::Error> {
|
||||
match *self {
|
||||
ContentEncoder::Br(ref mut encoder) => {
|
||||
match encoder.write(data) {
|
||||
match encoder.write(data.as_ref()) {
|
||||
Ok(_) =>
|
||||
encoder.flush(),
|
||||
Err(err) => {
|
||||
@ -642,7 +618,7 @@ impl ContentEncoder {
|
||||
}
|
||||
},
|
||||
ContentEncoder::Gzip(ref mut encoder) => {
|
||||
match encoder.write(data) {
|
||||
match encoder.write(data.as_ref()) {
|
||||
Ok(_) =>
|
||||
encoder.flush(),
|
||||
Err(err) => {
|
||||
@ -652,7 +628,7 @@ impl ContentEncoder {
|
||||
}
|
||||
}
|
||||
ContentEncoder::Deflate(ref mut encoder) => {
|
||||
match encoder.write(data) {
|
||||
match encoder.write(data.as_ref()) {
|
||||
Ok(_) =>
|
||||
encoder.flush(),
|
||||
Err(err) => {
|
||||
@ -686,7 +662,7 @@ enum TransferEncodingKind {
|
||||
Length(u64),
|
||||
/// An Encoder for when Content-Length is not known.
|
||||
///
|
||||
/// Appliction decides when to stop writing.
|
||||
/// Application decides when to stop writing.
|
||||
Eof,
|
||||
}
|
||||
|
||||
@ -727,11 +703,12 @@ impl TransferEncoding {
|
||||
|
||||
/// Encode message. Return `EOF` state of encoder
|
||||
#[inline]
|
||||
pub fn encode(&mut self, msg: &[u8]) -> io::Result<bool> {
|
||||
pub fn encode(&mut self, mut msg: Binary) -> io::Result<bool> {
|
||||
match self.kind {
|
||||
TransferEncodingKind::Eof => {
|
||||
self.buffer.get_mut().extend_from_slice(msg);
|
||||
Ok(msg.is_empty())
|
||||
let eof = msg.is_empty();
|
||||
self.buffer.extend(msg);
|
||||
Ok(eof)
|
||||
},
|
||||
TransferEncodingKind::Chunked(ref mut eof) => {
|
||||
if *eof {
|
||||
@ -740,24 +717,31 @@ impl TransferEncoding {
|
||||
|
||||
if msg.is_empty() {
|
||||
*eof = true;
|
||||
self.buffer.get_mut().extend_from_slice(b"0\r\n\r\n");
|
||||
self.buffer.extend_from_slice(b"0\r\n\r\n");
|
||||
} else {
|
||||
write!(self.buffer.get_mut(), "{:X}\r\n", msg.len())
|
||||
let mut buf = BytesMut::new();
|
||||
write!(&mut buf, "{:X}\r\n", msg.len())
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
|
||||
self.buffer.get_mut().extend_from_slice(msg);
|
||||
self.buffer.get_mut().extend_from_slice(b"\r\n");
|
||||
self.buffer.reserve(buf.len() + msg.len() + 2);
|
||||
self.buffer.extend(buf.into());
|
||||
self.buffer.extend(msg);
|
||||
self.buffer.extend_from_slice(b"\r\n");
|
||||
}
|
||||
Ok(*eof)
|
||||
},
|
||||
TransferEncodingKind::Length(ref mut remaining) => {
|
||||
if msg.is_empty() {
|
||||
return Ok(*remaining == 0)
|
||||
}
|
||||
let max = cmp::min(*remaining, msg.len() as u64);
|
||||
self.buffer.get_mut().extend_from_slice(msg[..max as usize].as_ref());
|
||||
if *remaining > 0 {
|
||||
if msg.is_empty() {
|
||||
return Ok(*remaining == 0)
|
||||
}
|
||||
let len = cmp::min(*remaining, msg.len() as u64);
|
||||
self.buffer.extend(msg.take().split_to(len as usize).into());
|
||||
|
||||
*remaining -= max as u64;
|
||||
Ok(*remaining == 0)
|
||||
*remaining -= len as u64;
|
||||
Ok(*remaining == 0)
|
||||
} else {
|
||||
Ok(true)
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
@ -770,7 +754,7 @@ impl TransferEncoding {
|
||||
TransferEncodingKind::Chunked(ref mut eof) => {
|
||||
if !*eof {
|
||||
*eof = true;
|
||||
self.buffer.get_mut().extend_from_slice(b"0\r\n\r\n");
|
||||
self.buffer.extend_from_slice(b"0\r\n\r\n");
|
||||
}
|
||||
},
|
||||
}
|
||||
@ -781,7 +765,7 @@ impl io::Write for TransferEncoding {
|
||||
|
||||
#[inline]
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.encode(buf)?;
|
||||
self.encode(Binary::from_slice(buf))?;
|
||||
Ok(buf.len())
|
||||
}
|
||||
|
||||
@ -867,8 +851,8 @@ mod tests {
|
||||
fn test_chunked_te() {
|
||||
let bytes = SharedBytes::default();
|
||||
let mut enc = TransferEncoding::chunked(bytes.clone());
|
||||
assert!(!enc.encode(b"test").ok().unwrap());
|
||||
assert!(enc.encode(b"").ok().unwrap());
|
||||
assert!(!enc.encode(Binary::from(b"test".as_ref())).ok().unwrap());
|
||||
assert!(enc.encode(Binary::from(b"".as_ref())).ok().unwrap());
|
||||
assert_eq!(bytes.get_mut().take().freeze(),
|
||||
Bytes::from_static(b"4\r\ntest\r\n0\r\n\r\n"));
|
||||
}
|
||||
|
@ -96,12 +96,12 @@ impl<T, H> Http1<T, H>
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: refacrtor
|
||||
// TODO: refactor
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(cyclomatic_complexity))]
|
||||
pub fn poll(&mut self) -> Poll<(), ()> {
|
||||
// keep-alive timer
|
||||
if self.keepalive_timer.is_some() {
|
||||
match self.keepalive_timer.as_mut().unwrap().poll() {
|
||||
if let Some(ref mut timer) = self.keepalive_timer {
|
||||
match timer.poll() {
|
||||
Ok(Async::Ready(_)) => {
|
||||
trace!("Keep-alive timeout, close connection");
|
||||
return Ok(Async::Ready(()))
|
||||
@ -133,7 +133,7 @@ impl<T, H> Http1<T, H>
|
||||
Ok(Async::Ready(ready)) => {
|
||||
not_ready = false;
|
||||
|
||||
// overide keep-alive state
|
||||
// override keep-alive state
|
||||
if self.stream.keepalive() {
|
||||
self.flags.insert(Flags::KEEPALIVE);
|
||||
} else {
|
||||
@ -146,10 +146,8 @@ impl<T, H> Http1<T, H>
|
||||
item.flags.insert(EntryFlags::FINISHED);
|
||||
}
|
||||
},
|
||||
Ok(Async::NotReady) => {
|
||||
// no more IO for this iteration
|
||||
io = true;
|
||||
},
|
||||
// no more IO for this iteration
|
||||
Ok(Async::NotReady) => io = true,
|
||||
Err(err) => {
|
||||
// it is not possible to recover from error
|
||||
// during pipe handling, so just drop connection
|
||||
@ -227,38 +225,7 @@ impl<T, H> Http1<T, H>
|
||||
self.tasks.push_back(
|
||||
Entry {pipe: pipe.unwrap_or_else(|| Pipeline::error(HTTPNotFound)),
|
||||
flags: EntryFlags::empty()});
|
||||
}
|
||||
Err(ReaderError::Disconnect) => {
|
||||
not_ready = false;
|
||||
self.flags.insert(Flags::ERROR);
|
||||
self.stream.disconnected();
|
||||
for entry in &mut self.tasks {
|
||||
entry.pipe.disconnected()
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
// notify all tasks
|
||||
not_ready = false;
|
||||
self.stream.disconnected();
|
||||
for entry in &mut self.tasks {
|
||||
entry.pipe.disconnected()
|
||||
}
|
||||
|
||||
// kill keepalive
|
||||
self.flags.remove(Flags::KEEPALIVE);
|
||||
self.keepalive_timer.take();
|
||||
|
||||
// on parse error, stop reading stream but tasks need to be completed
|
||||
self.flags.insert(Flags::ERROR);
|
||||
|
||||
if self.tasks.is_empty() {
|
||||
if let ReaderError::Error(err) = err {
|
||||
self.tasks.push_back(
|
||||
Entry {pipe: Pipeline::error(err.error_response()),
|
||||
flags: EntryFlags::empty()});
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Async::NotReady) => {
|
||||
// start keep-alive timer, this also is slow request timeout
|
||||
if self.tasks.is_empty() {
|
||||
@ -293,7 +260,38 @@ impl<T, H> Http1<T, H>
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
},
|
||||
Err(ReaderError::Disconnect) => {
|
||||
not_ready = false;
|
||||
self.flags.insert(Flags::ERROR);
|
||||
self.stream.disconnected();
|
||||
for entry in &mut self.tasks {
|
||||
entry.pipe.disconnected()
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
// notify all tasks
|
||||
not_ready = false;
|
||||
self.stream.disconnected();
|
||||
for entry in &mut self.tasks {
|
||||
entry.pipe.disconnected()
|
||||
}
|
||||
|
||||
// kill keepalive
|
||||
self.flags.remove(Flags::KEEPALIVE);
|
||||
self.keepalive_timer.take();
|
||||
|
||||
// on parse error, stop reading stream but tasks need to be completed
|
||||
self.flags.insert(Flags::ERROR);
|
||||
|
||||
if self.tasks.is_empty() {
|
||||
if let ReaderError::Error(err) = err {
|
||||
self.tasks.push_back(
|
||||
Entry {pipe: Pipeline::error(err.error_response()),
|
||||
flags: EntryFlags::empty()});
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@ -1204,6 +1202,7 @@ mod tests {
|
||||
panic!("Error");
|
||||
}
|
||||
|
||||
// type in chunked
|
||||
let mut buf = Buffer::new(
|
||||
"GET /test HTTP/1.1\r\n\
|
||||
transfer-encoding: chnked\r\n\r\n");
|
||||
|
@ -2,15 +2,15 @@ use std::io;
|
||||
use bytes::BufMut;
|
||||
use futures::{Async, Poll};
|
||||
use tokio_io::AsyncWrite;
|
||||
use http::Version;
|
||||
use http::{Method, Version};
|
||||
use http::header::{HeaderValue, CONNECTION, DATE};
|
||||
|
||||
use helpers;
|
||||
use body::Body;
|
||||
use helpers::SharedBytes;
|
||||
use body::{Body, Binary};
|
||||
use httprequest::HttpMessage;
|
||||
use httpresponse::HttpResponse;
|
||||
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
|
||||
use super::shared::SharedBytes;
|
||||
use super::encoding::PayloadEncoder;
|
||||
|
||||
const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific
|
||||
@ -56,23 +56,25 @@ impl<T: AsyncWrite> H1Writer<T> {
|
||||
}
|
||||
|
||||
pub fn disconnected(&mut self) {
|
||||
self.encoder.get_mut().take();
|
||||
self.buffer.take();
|
||||
}
|
||||
|
||||
pub fn keepalive(&self) -> bool {
|
||||
self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE)
|
||||
}
|
||||
|
||||
fn write_to_stream(&mut self) -> Result<WriterState, io::Error> {
|
||||
let buffer = self.encoder.get_mut();
|
||||
|
||||
while !buffer.is_empty() {
|
||||
match self.stream.write(buffer.as_ref()) {
|
||||
fn write_to_stream(&mut self) -> io::Result<WriterState> {
|
||||
while !self.buffer.is_empty() {
|
||||
match self.stream.write(self.buffer.as_ref()) {
|
||||
Ok(0) => {
|
||||
self.disconnected();
|
||||
return Ok(WriterState::Done);
|
||||
},
|
||||
Ok(n) => {
|
||||
let _ = buffer.split_to(n);
|
||||
let _ = self.buffer.split_to(n);
|
||||
},
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
if buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
return Ok(WriterState::Pause)
|
||||
} else {
|
||||
return Ok(WriterState::Done)
|
||||
@ -92,23 +94,7 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
|
||||
self.written
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn flush(&mut self) -> Poll<(), io::Error> {
|
||||
match self.stream.flush() {
|
||||
Ok(_) => Ok(Async::Ready(())),
|
||||
Err(e) => {
|
||||
if e.kind() == io::ErrorKind::WouldBlock {
|
||||
Ok(Async::NotReady)
|
||||
} else {
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse)
|
||||
-> Result<WriterState, io::Error>
|
||||
{
|
||||
fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse) -> io::Result<WriterState> {
|
||||
// prepare task
|
||||
self.flags.insert(Flags::STARTED);
|
||||
self.encoder = PayloadEncoder::new(self.buffer.clone(), req, msg);
|
||||
@ -133,7 +119,7 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
|
||||
|
||||
// render message
|
||||
{
|
||||
let mut buffer = self.encoder.get_mut();
|
||||
let mut buffer = self.buffer.get_mut();
|
||||
if let Body::Binary(ref bytes) = body {
|
||||
buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len());
|
||||
} else {
|
||||
@ -146,7 +132,11 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
|
||||
|
||||
match body {
|
||||
Body::Empty =>
|
||||
buffer.extend_from_slice(b"\r\ncontent-length: 0\r\n"),
|
||||
if req.method != Method::HEAD {
|
||||
buffer.extend_from_slice(b"\r\ncontent-length: 0\r\n");
|
||||
} else {
|
||||
buffer.extend_from_slice(b"\r\n");
|
||||
},
|
||||
Body::Binary(ref bytes) =>
|
||||
helpers::write_content_length(bytes.len(), &mut buffer),
|
||||
_ =>
|
||||
@ -176,14 +166,14 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
|
||||
|
||||
if let Body::Binary(bytes) = body {
|
||||
self.written = bytes.len() as u64;
|
||||
self.encoder.write(bytes.as_ref())?;
|
||||
self.encoder.write(bytes)?;
|
||||
} else {
|
||||
msg.replace_body(body);
|
||||
}
|
||||
Ok(WriterState::Done)
|
||||
}
|
||||
|
||||
fn write(&mut self, payload: &[u8]) -> Result<WriterState, io::Error> {
|
||||
fn write(&mut self, payload: Binary) -> io::Result<WriterState> {
|
||||
self.written += payload.len() as u64;
|
||||
if !self.flags.contains(Flags::DISCONNECTED) {
|
||||
if self.flags.contains(Flags::STARTED) {
|
||||
@ -192,24 +182,24 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
|
||||
return Ok(WriterState::Done)
|
||||
} else {
|
||||
// might be response to EXCEPT
|
||||
self.encoder.get_mut().extend_from_slice(payload)
|
||||
self.buffer.extend_from_slice(payload.as_ref())
|
||||
}
|
||||
}
|
||||
|
||||
if self.encoder.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
Ok(WriterState::Pause)
|
||||
} else {
|
||||
Ok(WriterState::Done)
|
||||
}
|
||||
}
|
||||
|
||||
fn write_eof(&mut self) -> Result<WriterState, io::Error> {
|
||||
fn write_eof(&mut self) -> io::Result<WriterState> {
|
||||
self.encoder.write_eof()?;
|
||||
|
||||
if !self.encoder.is_eof() {
|
||||
Err(io::Error::new(io::ErrorKind::Other,
|
||||
"Last payload item, but eof is not reached"))
|
||||
} else if self.encoder.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
} else if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
Ok(WriterState::Pause)
|
||||
} else {
|
||||
Ok(WriterState::Done)
|
||||
|
@ -7,11 +7,11 @@ use http::{Version, HttpTryFrom, Response};
|
||||
use http::header::{HeaderValue, CONNECTION, TRANSFER_ENCODING, DATE, CONTENT_LENGTH};
|
||||
|
||||
use helpers;
|
||||
use body::Body;
|
||||
use helpers::SharedBytes;
|
||||
use body::{Body, Binary};
|
||||
use httprequest::HttpMessage;
|
||||
use httpresponse::HttpResponse;
|
||||
use super::encoding::PayloadEncoder;
|
||||
use super::shared::SharedBytes;
|
||||
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
|
||||
|
||||
const CHUNK_SIZE: usize = 16_384;
|
||||
@ -52,15 +52,13 @@ impl H2Writer {
|
||||
}
|
||||
}
|
||||
|
||||
fn write_to_stream(&mut self) -> Result<WriterState, io::Error> {
|
||||
fn write_to_stream(&mut self) -> io::Result<WriterState> {
|
||||
if !self.flags.contains(Flags::STARTED) {
|
||||
return Ok(WriterState::Done)
|
||||
}
|
||||
|
||||
if let Some(ref mut stream) = self.stream {
|
||||
let buffer = self.encoder.get_mut();
|
||||
|
||||
if buffer.is_empty() {
|
||||
if self.buffer.is_empty() {
|
||||
if self.flags.contains(Flags::EOF) {
|
||||
let _ = stream.send_data(Bytes::new(), true);
|
||||
}
|
||||
@ -70,7 +68,7 @@ impl H2Writer {
|
||||
loop {
|
||||
match stream.poll_capacity() {
|
||||
Ok(Async::NotReady) => {
|
||||
if buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
return Ok(WriterState::Pause)
|
||||
} else {
|
||||
return Ok(WriterState::Done)
|
||||
@ -80,15 +78,15 @@ impl H2Writer {
|
||||
return Ok(WriterState::Done)
|
||||
}
|
||||
Ok(Async::Ready(Some(cap))) => {
|
||||
let len = buffer.len();
|
||||
let bytes = buffer.split_to(cmp::min(cap, len));
|
||||
let eof = buffer.is_empty() && self.flags.contains(Flags::EOF);
|
||||
let len = self.buffer.len();
|
||||
let bytes = self.buffer.split_to(cmp::min(cap, len));
|
||||
let eof = self.buffer.is_empty() && self.flags.contains(Flags::EOF);
|
||||
self.written += bytes.len() as u64;
|
||||
|
||||
if let Err(err) = stream.send_data(bytes.freeze(), eof) {
|
||||
return Err(io::Error::new(io::ErrorKind::Other, err))
|
||||
} else if !buffer.is_empty() {
|
||||
let cap = cmp::min(buffer.len(), CHUNK_SIZE);
|
||||
} else if !self.buffer.is_empty() {
|
||||
let cap = cmp::min(self.buffer.len(), CHUNK_SIZE);
|
||||
stream.reserve_capacity(cap);
|
||||
} else {
|
||||
return Ok(WriterState::Pause)
|
||||
@ -110,16 +108,7 @@ impl Writer for H2Writer {
|
||||
self.written
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn flush(&mut self) -> Poll<(), io::Error> {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
|
||||
fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse)
|
||||
-> Result<WriterState, io::Error>
|
||||
{
|
||||
// trace!("Prepare response with status: {:?}", msg.status());
|
||||
|
||||
fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse) -> io::Result<WriterState> {
|
||||
// prepare response
|
||||
self.flags.insert(Flags::STARTED);
|
||||
self.encoder = PayloadEncoder::new(self.buffer.clone(), req, msg);
|
||||
@ -172,9 +161,9 @@ impl Writer for H2Writer {
|
||||
if let Body::Binary(bytes) = body {
|
||||
self.flags.insert(Flags::EOF);
|
||||
self.written = bytes.len() as u64;
|
||||
self.encoder.write(bytes.as_ref())?;
|
||||
self.encoder.write(bytes)?;
|
||||
if let Some(ref mut stream) = self.stream {
|
||||
stream.reserve_capacity(cmp::min(self.encoder.len(), CHUNK_SIZE));
|
||||
stream.reserve_capacity(cmp::min(self.buffer.len(), CHUNK_SIZE));
|
||||
}
|
||||
Ok(WriterState::Pause)
|
||||
} else {
|
||||
@ -183,7 +172,7 @@ impl Writer for H2Writer {
|
||||
}
|
||||
}
|
||||
|
||||
fn write(&mut self, payload: &[u8]) -> Result<WriterState, io::Error> {
|
||||
fn write(&mut self, payload: Binary) -> io::Result<WriterState> {
|
||||
self.written = payload.len() as u64;
|
||||
|
||||
if !self.flags.contains(Flags::DISCONNECTED) {
|
||||
@ -192,25 +181,25 @@ impl Writer for H2Writer {
|
||||
self.encoder.write(payload)?;
|
||||
} else {
|
||||
// might be response for EXCEPT
|
||||
self.encoder.get_mut().extend_from_slice(payload)
|
||||
self.buffer.extend_from_slice(payload.as_ref())
|
||||
}
|
||||
}
|
||||
|
||||
if self.encoder.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
Ok(WriterState::Pause)
|
||||
} else {
|
||||
Ok(WriterState::Done)
|
||||
}
|
||||
}
|
||||
|
||||
fn write_eof(&mut self) -> Result<WriterState, io::Error> {
|
||||
fn write_eof(&mut self) -> io::Result<WriterState> {
|
||||
self.encoder.write_eof()?;
|
||||
|
||||
self.flags.insert(Flags::EOF);
|
||||
if !self.encoder.is_eof() {
|
||||
Err(io::Error::new(io::ErrorKind::Other,
|
||||
"Last payload item, but eof is not reached"))
|
||||
} else if self.encoder.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
} else if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
Ok(WriterState::Pause)
|
||||
} else {
|
||||
Ok(WriterState::Done)
|
||||
|
@ -15,11 +15,13 @@ mod h2;
|
||||
mod h1writer;
|
||||
mod h2writer;
|
||||
mod settings;
|
||||
mod shared;
|
||||
mod utils;
|
||||
|
||||
pub use self::srv::HttpServer;
|
||||
pub use self::settings::ServerSettings;
|
||||
|
||||
use body::Binary;
|
||||
use error::Error;
|
||||
use httprequest::{HttpMessage, HttpRequest};
|
||||
use httpresponse::HttpResponse;
|
||||
@ -54,6 +56,12 @@ pub trait HttpHandler: 'static {
|
||||
fn handle(&mut self, req: HttpRequest) -> Result<Box<HttpHandlerTask>, HttpRequest>;
|
||||
}
|
||||
|
||||
impl HttpHandler for Box<HttpHandler> {
|
||||
fn handle(&mut self, req: HttpRequest) -> Result<Box<HttpHandlerTask>, HttpRequest> {
|
||||
self.as_mut().handle(req)
|
||||
}
|
||||
}
|
||||
|
||||
pub trait HttpHandlerTask {
|
||||
|
||||
fn poll(&mut self) -> Poll<(), Error>;
|
||||
@ -90,14 +98,11 @@ pub enum WriterState {
|
||||
pub trait Writer {
|
||||
fn written(&self) -> u64;
|
||||
|
||||
fn start(&mut self, req: &mut HttpMessage, resp: &mut HttpResponse)
|
||||
-> Result<WriterState, io::Error>;
|
||||
fn start(&mut self, req: &mut HttpMessage, resp: &mut HttpResponse) -> io::Result<WriterState>;
|
||||
|
||||
fn write(&mut self, payload: &[u8]) -> Result<WriterState, io::Error>;
|
||||
fn write(&mut self, payload: Binary) -> io::Result<WriterState>;
|
||||
|
||||
fn write_eof(&mut self) -> Result<WriterState, io::Error>;
|
||||
|
||||
fn flush(&mut self) -> Poll<(), io::Error>;
|
||||
fn write_eof(&mut self) -> io::Result<WriterState>;
|
||||
|
||||
fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error>;
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ use std::cell::{Cell, RefCell, RefMut};
|
||||
|
||||
use helpers;
|
||||
use super::channel::Node;
|
||||
use super::shared::{SharedBytes, SharedBytesPool};
|
||||
|
||||
/// Various server settings
|
||||
#[derive(Debug, Clone)]
|
||||
@ -63,7 +64,7 @@ pub(crate) struct WorkerSettings<H> {
|
||||
h: RefCell<Vec<H>>,
|
||||
enabled: bool,
|
||||
keep_alive: u64,
|
||||
bytes: Rc<helpers::SharedBytesPool>,
|
||||
bytes: Rc<SharedBytesPool>,
|
||||
messages: Rc<helpers::SharedMessagePool>,
|
||||
channels: Cell<usize>,
|
||||
node: Node<()>,
|
||||
@ -75,7 +76,7 @@ impl<H> WorkerSettings<H> {
|
||||
h: RefCell::new(h),
|
||||
enabled: if let Some(ka) = keep_alive { ka > 0 } else { false },
|
||||
keep_alive: keep_alive.unwrap_or(0),
|
||||
bytes: Rc::new(helpers::SharedBytesPool::new()),
|
||||
bytes: Rc::new(SharedBytesPool::new()),
|
||||
messages: Rc::new(helpers::SharedMessagePool::new()),
|
||||
channels: Cell::new(0),
|
||||
node: Node::head(),
|
||||
@ -102,8 +103,8 @@ impl<H> WorkerSettings<H> {
|
||||
self.enabled
|
||||
}
|
||||
|
||||
pub fn get_shared_bytes(&self) -> helpers::SharedBytes {
|
||||
helpers::SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes))
|
||||
pub fn get_shared_bytes(&self) -> SharedBytes {
|
||||
SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes))
|
||||
}
|
||||
|
||||
pub fn get_http_message(&self) -> helpers::SharedHttpMessage {
|
||||
|
120
src/server/shared.rs
Normal file
120
src/server/shared.rs
Normal file
@ -0,0 +1,120 @@
|
||||
use std::mem;
|
||||
use std::cell::RefCell;
|
||||
use std::rc::Rc;
|
||||
use std::collections::VecDeque;
|
||||
use bytes::BytesMut;
|
||||
|
||||
use body::Binary;
|
||||
|
||||
|
||||
/// Internal use only! unsafe
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct SharedBytesPool(RefCell<VecDeque<Rc<BytesMut>>>);
|
||||
|
||||
impl SharedBytesPool {
|
||||
pub fn new() -> SharedBytesPool {
|
||||
SharedBytesPool(RefCell::new(VecDeque::with_capacity(128)))
|
||||
}
|
||||
|
||||
pub fn get_bytes(&self) -> Rc<BytesMut> {
|
||||
if let Some(bytes) = self.0.borrow_mut().pop_front() {
|
||||
bytes
|
||||
} else {
|
||||
Rc::new(BytesMut::new())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn release_bytes(&self, mut bytes: Rc<BytesMut>) {
|
||||
let v = &mut self.0.borrow_mut();
|
||||
if v.len() < 128 {
|
||||
Rc::get_mut(&mut bytes).unwrap().take();
|
||||
v.push_front(bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct SharedBytes(
|
||||
Option<Rc<BytesMut>>, Option<Rc<SharedBytesPool>>);
|
||||
|
||||
impl Drop for SharedBytes {
|
||||
fn drop(&mut self) {
|
||||
if let Some(ref pool) = self.1 {
|
||||
if let Some(bytes) = self.0.take() {
|
||||
if Rc::strong_count(&bytes) == 1 {
|
||||
pool.release_bytes(bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SharedBytes {
|
||||
|
||||
pub fn empty() -> Self {
|
||||
SharedBytes(None, None)
|
||||
}
|
||||
|
||||
pub fn new(bytes: Rc<BytesMut>, pool: Rc<SharedBytesPool>) -> SharedBytes {
|
||||
SharedBytes(Some(bytes), Some(pool))
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
#[allow(mutable_transmutes)]
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))]
|
||||
pub fn get_mut(&self) -> &mut BytesMut {
|
||||
let r: &BytesMut = self.0.as_ref().unwrap().as_ref();
|
||||
unsafe{mem::transmute(r)}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn len(&self) -> usize {
|
||||
self.0.as_ref().unwrap().len()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.0.as_ref().unwrap().is_empty()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn as_ref(&self) -> &[u8] {
|
||||
self.0.as_ref().unwrap().as_ref()
|
||||
}
|
||||
|
||||
pub fn split_to(&self, n: usize) -> BytesMut {
|
||||
self.get_mut().split_to(n)
|
||||
}
|
||||
|
||||
pub fn take(&self) -> BytesMut {
|
||||
self.get_mut().take()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn reserve(&self, cnt: usize) {
|
||||
self.get_mut().reserve(cnt)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
|
||||
pub fn extend(&self, data: Binary) {
|
||||
self.get_mut().extend_from_slice(data.as_ref());
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn extend_from_slice(&self, data: &[u8]) {
|
||||
self.get_mut().extend_from_slice(data);
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for SharedBytes {
|
||||
fn default() -> Self {
|
||||
SharedBytes(Some(Rc::new(BytesMut::new())), None)
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for SharedBytes {
|
||||
fn clone(&self) -> SharedBytes {
|
||||
SharedBytes(self.0.clone(), self.1.clone())
|
||||
}
|
||||
}
|
@ -268,9 +268,9 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
|
||||
where U: IntoIterator<Item=V> + 'static,
|
||||
V: IntoHttpHandler<Handler=H>,
|
||||
{
|
||||
/// Start listening for incomming connections.
|
||||
/// Start listening for incoming connections.
|
||||
///
|
||||
/// This method starts number of http handler workers in seperate threads.
|
||||
/// This method starts number of http handler workers in separate threads.
|
||||
/// For each address this method starts separate thread which does `accept()` in a loop.
|
||||
///
|
||||
/// This methods panics if no socket addresses get bound.
|
||||
@ -298,7 +298,7 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
|
||||
pub fn start(mut self) -> SyncAddress<Self>
|
||||
{
|
||||
if self.sockets.is_empty() {
|
||||
panic!("HttpServer::bind() has to be called befor start()");
|
||||
panic!("HttpServer::bind() has to be called before start()");
|
||||
} else {
|
||||
let addrs: Vec<(net::SocketAddr, net::TcpListener)> =
|
||||
self.sockets.drain().collect();
|
||||
@ -320,7 +320,7 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn new thread and start listening for incomming connections.
|
||||
/// Spawn new thread and start listening for incoming connections.
|
||||
///
|
||||
/// This method spawns new thread and starts new actix system. Other than that it is
|
||||
/// similar to `start()` method. This method blocks.
|
||||
@ -359,7 +359,7 @@ impl<H: HttpHandler, U, V> HttpServer<TlsStream<TcpStream>, net::SocketAddr, H,
|
||||
where U: IntoIterator<Item=V> + 'static,
|
||||
V: IntoHttpHandler<Handler=H>,
|
||||
{
|
||||
/// Start listening for incomming tls connections.
|
||||
/// Start listening for incoming tls connections.
|
||||
pub fn start_tls(mut self, pkcs12: ::Pkcs12) -> io::Result<SyncAddress<Self>> {
|
||||
if self.sockets.is_empty() {
|
||||
Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound"))
|
||||
@ -398,7 +398,7 @@ impl<H: HttpHandler, U, V> HttpServer<SslStream<TcpStream>, net::SocketAddr, H,
|
||||
where U: IntoIterator<Item=V> + 'static,
|
||||
V: IntoHttpHandler<Handler=H>,
|
||||
{
|
||||
/// Start listening for incomming tls connections.
|
||||
/// Start listening for incoming tls connections.
|
||||
///
|
||||
/// This method sets alpn protocols to "h2" and "http/1.1"
|
||||
pub fn start_ssl(mut self, identity: &ParsedPkcs12) -> io::Result<SyncAddress<Self>> {
|
||||
@ -443,7 +443,7 @@ impl<T, A, H, U, V> HttpServer<WrapperStream<T>, A, H, U>
|
||||
U: IntoIterator<Item=V> + 'static,
|
||||
V: IntoHttpHandler<Handler=H>,
|
||||
{
|
||||
/// Start listening for incomming connections from a stream.
|
||||
/// Start listening for incoming connections from a stream.
|
||||
///
|
||||
/// This method uses only one thread for handling incoming connections.
|
||||
pub fn start_incoming<S>(mut self, stream: S, secure: bool) -> SyncAddress<Self>
|
||||
@ -663,7 +663,7 @@ fn start_accept_thread(sock: net::TcpListener, addr: net::SocketAddr, backlog: i
|
||||
}
|
||||
}
|
||||
|
||||
// Start listening for incommin commands
|
||||
// Start listening for incoming commands
|
||||
if let Err(err) = poll.register(®, CMD,
|
||||
mio::Ready::readable(), mio::PollOpt::edge()) {
|
||||
panic!("Can not register Registration: {}", err);
|
||||
|
10
src/test.rs
10
src/test.rs
@ -29,7 +29,7 @@ use server::{HttpServer, HttpHandler, IntoHttpHandler, ServerSettings};
|
||||
/// The `TestServer` type.
|
||||
///
|
||||
/// `TestServer` is very simple test server that simplify process of writing
|
||||
/// integrational tests cases for actix web applications.
|
||||
/// integration tests cases for actix web applications.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
@ -61,7 +61,7 @@ impl TestServer {
|
||||
|
||||
/// Start new test server
|
||||
///
|
||||
/// This methos accepts configuration method. You can add
|
||||
/// This method accepts configuration method. You can add
|
||||
/// middlewares or set handlers for test application.
|
||||
pub fn new<F>(config: F) -> Self
|
||||
where F: Sync + Send + 'static + Fn(&mut TestApp<()>),
|
||||
@ -101,7 +101,7 @@ impl TestServer {
|
||||
|
||||
/// Start new test server with custom application state
|
||||
///
|
||||
/// This methos accepts state factory and configuration method.
|
||||
/// This method accepts state factory and configuration method.
|
||||
pub fn with_state<S, FS, F>(state: FS, config: F) -> Self
|
||||
where S: 'static,
|
||||
FS: Sync + Send + 'static + Fn() -> S,
|
||||
@ -287,12 +287,12 @@ impl Default for TestRequest<()> {
|
||||
|
||||
impl TestRequest<()> {
|
||||
|
||||
/// Create TestReqeust and set request uri
|
||||
/// Create TestRequest and set request uri
|
||||
pub fn with_uri(path: &str) -> TestRequest<()> {
|
||||
TestRequest::default().uri(path)
|
||||
}
|
||||
|
||||
/// Create TestReqeust and set header
|
||||
/// Create TestRequest and set header
|
||||
pub fn with_header<K, V>(key: K, value: V) -> TestRequest<()>
|
||||
where HeaderName: HttpTryFrom<K>,
|
||||
HeaderValue: HttpTryFrom<V>
|
||||
|
@ -5,14 +5,7 @@ use bytes::BytesMut;
|
||||
|
||||
use body::Binary;
|
||||
use ws::proto::{OpCode, CloseCode};
|
||||
|
||||
|
||||
fn apply_mask(buf: &mut [u8], mask: &[u8; 4]) {
|
||||
let iter = buf.iter_mut().zip(mask.iter().cycle());
|
||||
for (byte, &key) in iter {
|
||||
*byte ^= key
|
||||
}
|
||||
}
|
||||
use ws::mask::apply_mask;
|
||||
|
||||
/// A struct representing a `WebSocket` frame.
|
||||
#[derive(Debug)]
|
||||
@ -28,7 +21,7 @@ pub(crate) struct Frame {
|
||||
|
||||
impl Frame {
|
||||
|
||||
/// Desctructe frame
|
||||
/// Destruct frame
|
||||
pub fn unpack(self) -> (bool, OpCode, Binary) {
|
||||
(self.finished, self.opcode, self.payload)
|
||||
}
|
||||
|
120
src/ws/mask.rs
Normal file
120
src/ws/mask.rs
Normal file
@ -0,0 +1,120 @@
|
||||
//! This is code from [Tungstenite project](https://github.com/snapview/tungstenite-rs)
|
||||
use std::cmp::min;
|
||||
use std::mem::uninitialized;
|
||||
use std::ptr::copy_nonoverlapping;
|
||||
|
||||
/// Mask/unmask a frame.
|
||||
#[inline]
|
||||
pub fn apply_mask(buf: &mut [u8], mask: &[u8; 4]) {
|
||||
apply_mask_fast32(buf, mask)
|
||||
}
|
||||
|
||||
/// A safe unoptimized mask application.
|
||||
#[inline]
|
||||
#[allow(dead_code)]
|
||||
fn apply_mask_fallback(buf: &mut [u8], mask: &[u8; 4]) {
|
||||
for (i, byte) in buf.iter_mut().enumerate() {
|
||||
*byte ^= mask[i & 3];
|
||||
}
|
||||
}
|
||||
|
||||
/// Faster version of `apply_mask()` which operates on 4-byte blocks.
|
||||
#[inline]
|
||||
#[allow(dead_code)]
|
||||
fn apply_mask_fast32(buf: &mut [u8], mask: &[u8; 4]) {
|
||||
// TODO replace this with read_unaligned() as it stabilizes.
|
||||
let mask_u32 = unsafe {
|
||||
let mut m: u32 = uninitialized();
|
||||
#[allow(trivial_casts)]
|
||||
copy_nonoverlapping(mask.as_ptr(), &mut m as *mut _ as *mut u8, 4);
|
||||
m
|
||||
};
|
||||
|
||||
let mut ptr = buf.as_mut_ptr();
|
||||
let mut len = buf.len();
|
||||
|
||||
// Possible first unaligned block.
|
||||
let head = min(len, (4 - (ptr as usize & 3)) & 3);
|
||||
let mask_u32 = if head > 0 {
|
||||
unsafe {
|
||||
xor_mem(ptr, mask_u32, head);
|
||||
ptr = ptr.offset(head as isize);
|
||||
}
|
||||
len -= head;
|
||||
if cfg!(target_endian = "big") {
|
||||
mask_u32.rotate_left(8 * head as u32)
|
||||
} else {
|
||||
mask_u32.rotate_right(8 * head as u32)
|
||||
}
|
||||
} else {
|
||||
mask_u32
|
||||
};
|
||||
|
||||
if len > 0 {
|
||||
debug_assert_eq!(ptr as usize % 4, 0);
|
||||
}
|
||||
|
||||
// Properly aligned middle of the data.
|
||||
while len > 4 {
|
||||
unsafe {
|
||||
*(ptr as *mut u32) ^= mask_u32;
|
||||
ptr = ptr.offset(4);
|
||||
len -= 4;
|
||||
}
|
||||
}
|
||||
|
||||
// Possible last block.
|
||||
if len > 0 {
|
||||
unsafe { xor_mem(ptr, mask_u32, len); }
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
// TODO: copy_nonoverlapping here compiles to call memcpy. While it is not so inefficient,
|
||||
// it could be done better. The compiler does not see that len is limited to 3.
|
||||
unsafe fn xor_mem(ptr: *mut u8, mask: u32, len: usize) {
|
||||
let mut b: u32 = uninitialized();
|
||||
#[allow(trivial_casts)]
|
||||
copy_nonoverlapping(ptr, &mut b as *mut _ as *mut u8, len);
|
||||
b ^= mask;
|
||||
#[allow(trivial_casts)]
|
||||
copy_nonoverlapping(&b as *const _ as *const u8, ptr, len);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{apply_mask_fallback, apply_mask_fast32};
|
||||
|
||||
#[test]
|
||||
fn test_apply_mask() {
|
||||
let mask = [
|
||||
0x6d, 0xb6, 0xb2, 0x80,
|
||||
];
|
||||
let unmasked = vec![
|
||||
0xf3, 0x00, 0x01, 0x02, 0x03, 0x80, 0x81, 0x82,
|
||||
0xff, 0xfe, 0x00, 0x17, 0x74, 0xf9, 0x12, 0x03,
|
||||
];
|
||||
|
||||
// Check masking with proper alignment.
|
||||
{
|
||||
let mut masked = unmasked.clone();
|
||||
apply_mask_fallback(&mut masked, &mask);
|
||||
|
||||
let mut masked_fast = unmasked.clone();
|
||||
apply_mask_fast32(&mut masked_fast, &mask);
|
||||
|
||||
assert_eq!(masked, masked_fast);
|
||||
}
|
||||
|
||||
// Check masking without alignment.
|
||||
{
|
||||
let mut masked = unmasked.clone();
|
||||
apply_mask_fallback(&mut masked[1..], &mask);
|
||||
|
||||
let mut masked_fast = unmasked.clone();
|
||||
apply_mask_fast32(&mut masked_fast[1..], &mask);
|
||||
|
||||
assert_eq!(masked, masked_fast);
|
||||
}
|
||||
}
|
||||
}
|
@ -58,6 +58,7 @@ use httpresponse::{ConnectionType, HttpResponse, HttpResponseBuilder};
|
||||
mod frame;
|
||||
mod proto;
|
||||
mod context;
|
||||
mod mask;
|
||||
|
||||
use ws::frame::Frame;
|
||||
use ws::proto::{hash_key, OpCode};
|
||||
|
@ -152,6 +152,66 @@ fn test_body_br_streaming() {
|
||||
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_head_empty() {
|
||||
let srv = test::TestServer::new(
|
||||
|app| app.handler(|_| {
|
||||
httpcodes::HTTPOk.build()
|
||||
.content_length(STR.len() as u64).finish()}));
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let mut res = client.head(&srv.url("/")).send().unwrap();
|
||||
assert!(res.status().is_success());
|
||||
let mut bytes = BytesMut::with_capacity(2048).writer();
|
||||
let len = res.headers()
|
||||
.get::<reqwest::header::ContentLength>().map(|ct_len| **ct_len).unwrap();
|
||||
assert_eq!(len, STR.len() as u64);
|
||||
let _ = res.copy_to(&mut bytes);
|
||||
let bytes = bytes.into_inner();
|
||||
assert!(bytes.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_head_binary() {
|
||||
let srv = test::TestServer::new(
|
||||
|app| app.handler(|_| {
|
||||
httpcodes::HTTPOk.build()
|
||||
.content_encoding(headers::ContentEncoding::Identity)
|
||||
.content_length(100).body(STR)}));
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let mut res = client.head(&srv.url("/")).send().unwrap();
|
||||
assert!(res.status().is_success());
|
||||
let mut bytes = BytesMut::with_capacity(2048).writer();
|
||||
let len = res.headers()
|
||||
.get::<reqwest::header::ContentLength>().map(|ct_len| **ct_len).unwrap();
|
||||
assert_eq!(len, STR.len() as u64);
|
||||
let _ = res.copy_to(&mut bytes);
|
||||
let bytes = bytes.into_inner();
|
||||
assert!(bytes.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_head_binary2() {
|
||||
let srv = test::TestServer::new(
|
||||
|app| app.handler(|_| {
|
||||
httpcodes::HTTPOk.build()
|
||||
.content_encoding(headers::ContentEncoding::Identity)
|
||||
.body(STR)
|
||||
}));
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let mut res = client.head(&srv.url("/")).send().unwrap();
|
||||
assert!(res.status().is_success());
|
||||
let mut bytes = BytesMut::with_capacity(2048).writer();
|
||||
let len = res.headers()
|
||||
.get::<reqwest::header::ContentLength>().map(|ct_len| **ct_len).unwrap();
|
||||
assert_eq!(len, STR.len() as u64);
|
||||
let _ = res.copy_to(&mut bytes);
|
||||
let bytes = bytes.into_inner();
|
||||
assert!(bytes.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_body_length() {
|
||||
let srv = test::TestServer::new(
|
||||
|
Reference in New Issue
Block a user