1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-11-30 18:44:35 +01:00

Merge pull request #1 from actix/master

Update from original
This commit is contained in:
Ami44 2017-12-30 21:27:48 +01:00 committed by GitHub
commit e18f9f3f3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
45 changed files with 1182 additions and 495 deletions

View File

@ -32,6 +32,9 @@ tls = ["native-tls", "tokio-tls"]
# openssl # openssl
alpn = ["openssl", "openssl/v102", "openssl/v110", "tokio-openssl"] alpn = ["openssl", "openssl/v102", "openssl/v110", "tokio-openssl"]
# signals
signal = ["actix/signal"]
[dependencies] [dependencies]
log = "0.3" log = "0.3"
failure = "0.1" failure = "0.1"

View File

@ -34,6 +34,7 @@ fn main() {
* [WebSockets](https://actix.github.io/actix-web/actix_web/ws/index.html) * [WebSockets](https://actix.github.io/actix-web/actix_web/ws/index.html)
* Transparent content compression/decompression (br, gzip, deflate) * Transparent content compression/decompression (br, gzip, deflate)
* Configurable request routing * Configurable request routing
* Graceful server shutdown
* Multipart streams * Multipart streams
* Middlewares ([Logger](https://actix.github.io/actix-web/guide/qs_10.html#logging), * Middlewares ([Logger](https://actix.github.io/actix-web/guide/qs_10.html#logging),
[Session](https://actix.github.io/actix-web/guide/qs_10.html#user-sessions), [Session](https://actix.github.io/actix-web/guide/qs_10.html#user-sessions),
@ -46,8 +47,8 @@ Some basic benchmarks could be found in this [respository](https://github.com/fa
## Examples ## Examples
* [Basic](https://github.com/actix/actix-web/tree/master/examples/basic.rs) * [Basic](https://github.com/actix/actix-web/tree/master/examples/basic/)
* [Stateful](https://github.com/actix/actix-web/tree/master/examples/state.rs) * [Stateful](https://github.com/actix/actix-web/tree/master/examples/state/)
* [Mulitpart streams](https://github.com/actix/actix-web/tree/master/examples/multipart/) * [Mulitpart streams](https://github.com/actix/actix-web/tree/master/examples/multipart/)
* [Simple websocket session](https://github.com/actix/actix-web/tree/master/examples/websocket.rs) * [Simple websocket session](https://github.com/actix/actix-web/tree/master/examples/websocket.rs)
* [Tera templates](https://github.com/actix/actix-web/tree/master/examples/template_tera/) * [Tera templates](https://github.com/actix/actix-web/tree/master/examples/template_tera/)
@ -55,17 +56,15 @@ Some basic benchmarks could be found in this [respository](https://github.com/fa
* [SSL / HTTP/2.0](https://github.com/actix/actix-web/tree/master/examples/tls/) * [SSL / HTTP/2.0](https://github.com/actix/actix-web/tree/master/examples/tls/)
* [Tcp/Websocket chat](https://github.com/actix/actix-web/tree/master/examples/websocket-chat/) * [Tcp/Websocket chat](https://github.com/actix/actix-web/tree/master/examples/websocket-chat/)
* [SockJS Server](https://github.com/actix/actix-sockjs) * [SockJS Server](https://github.com/actix/actix-sockjs)
* [Json](https://github.com/actix/actix-web/tree/master/examples/json/)
## License ## License
This project is licensed under either of This project is licensed under either of
* Apache License, Version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or * Apache License, Version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or [http://www.apache.org/licenses/LICENSE-2.0](http://www.apache.org/licenses/LICENSE-2.0))
http://www.apache.org/licenses/LICENSE-2.0) * MIT license ([LICENSE-MIT](LICENSE-MIT) or [http://opensource.org/licenses/MIT](http://opensource.org/licenses/MIT))
* MIT license ([LICENSE-MIT](LICENSE-MIT) or
http://opensource.org/licenses/MIT)
at your option. at your option.
[![Analytics](https://ga-beacon.appspot.com/UA-110322332-2/actix-web/readme?flat&useReferer)](https://github.com/igrigorik/ga-beacon) [![Analytics](https://ga-beacon.appspot.com/UA-110322332-2/actix-web/readme?flat&useReferer)](https://github.com/igrigorik/ga-beacon)

10
examples/basic/Cargo.toml Normal file
View File

@ -0,0 +1,10 @@
[package]
name = "basic"
version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
[dependencies]
futures = "*"
env_logger = "0.4"
actix = "^0.3.5"
actix-web = { git = "https://github.com/actix/actix-web", features=["signal"] }

19
examples/basic/README.md Normal file
View File

@ -0,0 +1,19 @@
# basic
## Usage
### server
```bash
cd actix-web/examples/basic
cargo run
# Started http server: 127.0.0.1:8080
```
### web client
- [http://localhost:8080/index.html](http://localhost:8080/index.html)
- [http://localhost:8080/async/bob](http://localhost:8080/async/bob)
- [http://localhost:8080/user/bob/](http://localhost:8080/user/bob/) plain/text download
- [http://localhost:8080/test](http://localhost:8080/test) (return status switch GET or POST or other)
- [http://localhost:8080/static/index.html](http://localhost:8080/static/index.html)

View File

@ -7,7 +7,9 @@ extern crate env_logger;
extern crate futures; extern crate futures;
use futures::Stream; use futures::Stream;
use actix::*;
use actix_web::*; use actix_web::*;
#[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe};
use actix_web::middleware::RequestSession; use actix_web::middleware::RequestSession;
use futures::future::{FutureResult, result}; use futures::future::{FutureResult, result};
@ -57,7 +59,7 @@ fn main() {
let _ = env_logger::init(); let _ = env_logger::init();
let sys = actix::System::new("basic-example"); let sys = actix::System::new("basic-example");
HttpServer::new( let addr = HttpServer::new(
|| Application::new() || Application::new()
// enable logger // enable logger
.middleware(middleware::Logger::default()) .middleware(middleware::Logger::default())
@ -82,7 +84,7 @@ fn main() {
})) }))
// static files // static files
.resource("/static/{tail:.*}", .resource("/static/{tail:.*}",
|r| r.h(fs::StaticFiles::new("tail", "examples/static/", true))) |r| r.h(fs::StaticFiles::new("tail", "../static/", true)))
// redirect // redirect
.resource("/", |r| r.method(Method::GET).f(|req| { .resource("/", |r| r.method(Method::GET).f(|req| {
println!("{:?}", req); println!("{:?}", req);
@ -94,6 +96,11 @@ fn main() {
.bind("0.0.0.0:8080").unwrap() .bind("0.0.0.0:8080").unwrap()
.start(); .start();
if cfg!(target_os = "linux") { // Subscribe to unix signals
let signals = Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber()));
}
println!("Starting http server: 127.0.0.1:8080"); println!("Starting http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();
} }

View File

@ -1,15 +1,20 @@
# diesel
Diesel's `Getting Started` guide using SQLite for Actix web Diesel's `Getting Started` guide using SQLite for Actix web
## Usage ## Usage
install `diesel_cli` install `diesel_cli`
``` ```bash
cargo install diesel_cli --no-default-features --features sqlite cargo install diesel_cli --no-default-features --features sqlite
``` ```
```bash
echo "DATABASE_URL=file:test.db" > .env
diesel migration run
```
``` ## Postgresql
$ echo "DATABASE_URL=file:test.db" > .env
$ diesel migration run You will also find another complete example of diesel+postgresql on [https://github.com/TechEmpower/FrameworkBenchmarks/tree/master/frameworks/Rust/actix](https://github.com/TechEmpower/FrameworkBenchmarks/tree/master/frameworks/Rust/actix)
```

View File

@ -11,6 +11,7 @@ env_logger = "*"
serde = "1.0" serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
json = "*"
actix = "^0.3.1" actix = "^0.3.5"
actix-web = { git = "https://github.com/actix/actix-web.git" } actix-web = { git = "https://github.com/actix/actix-web", features=["signal"] }

48
examples/json/README.md Normal file
View File

@ -0,0 +1,48 @@
# json
Json's `Getting Started` guide using json (serde-json or json-rust) for Actix web
## Usage
### server
```bash
cd actix-web/examples/json
cargo run
# Started http server: 127.0.0.1:8080
```
### web client
With [Postman](https://www.getpostman.com/) or [Rested](moz-extension://60daeb1c-5b1b-4afd-9842-0579ed34dfcb/dist/index.html)
- POST / (embed serde-json):
- method : ``POST``
- url : ``http://127.0.0.1:8080/``
- header : ``Content-Type`` = ``application/json``
- body (raw) : ``{"name": "Test user", "number": 100}``
- POST /manual (manual serde-json):
- method : ``POST``
- url : ``http://127.0.0.1:8080/manual``
- header : ``Content-Type`` = ``application/json``
- body (raw) : ``{"name": "Test user", "number": 100}``
- POST /mjsonrust (manual json-rust):
- method : ``POST``
- url : ``http://127.0.0.1:8080/mjsonrust``
- header : ``Content-Type`` = ``application/json``
- body (raw) : ``{"name": "Test user", "number": 100}`` (you can also test ``{notjson}``)
### python client
- ``pip install aiohttp``
- ``python client.py``
if ubuntu :
- ``pip3 install aiohttp``
- ``python3 client.py``

View File

@ -11,6 +11,7 @@ async def req():
data=json.dumps({"name": "Test user", "number": 100}), data=json.dumps({"name": "Test user", "number": 100}),
headers={"content-type": "application/json"}) headers={"content-type": "application/json"})
print(str(resp)) print(str(resp))
print(await resp.text())
assert 200 == resp.status assert 200 == resp.status

View File

@ -5,10 +5,15 @@ extern crate futures;
extern crate env_logger; extern crate env_logger;
extern crate serde_json; extern crate serde_json;
#[macro_use] extern crate serde_derive; #[macro_use] extern crate serde_derive;
#[macro_use] extern crate json;
use actix::*;
use actix_web::*; use actix_web::*;
#[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe};
use bytes::BytesMut; use bytes::BytesMut;
use futures::{Future, Stream}; use futures::{Future, Stream};
use json::JsonValue;
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
struct MyObj { struct MyObj {
@ -16,7 +21,7 @@ struct MyObj {
number: i32, number: i32,
} }
/// This handler uses `HttpRequest::json()` for loading json object. /// This handler uses `HttpRequest::json()` for loading serde json object.
fn index(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> { fn index(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> {
req.json() req.json()
.from_err() // convert all errors into `Error` .from_err() // convert all errors into `Error`
@ -30,7 +35,7 @@ fn index(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> {
const MAX_SIZE: usize = 262_144; // max payload size is 256k const MAX_SIZE: usize = 262_144; // max payload size is 256k
/// This handler manually load request payload and parse json /// This handler manually load request payload and parse serde json
fn index_manual(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> { fn index_manual(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> {
// readany() returns asynchronous stream of Bytes objects // readany() returns asynchronous stream of Bytes objects
req.payload_mut().readany() req.payload_mut().readany()
@ -52,27 +57,50 @@ fn index_manual(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Err
// `Future::and_then` can be used to merge an asynchronous workflow with a // `Future::and_then` can be used to merge an asynchronous workflow with a
// synchronous workflow // synchronous workflow
.and_then(|body| { .and_then(|body| {
// body is loaded, now we can deserialize json // body is loaded, now we can deserialize serde-json
let obj = serde_json::from_slice::<MyObj>(&body)?; let obj = serde_json::from_slice::<MyObj>(&body)?;
Ok(httpcodes::HTTPOk.build().json(obj)?) // <- send response Ok(httpcodes::HTTPOk.build().json(obj)?) // <- send response
}) })
.responder() .responder()
} }
/// This handler manually load request payload and parse json-rust
fn index_mjsonrust(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> {
req.payload_mut().readany().concat2()
.from_err()
.and_then(|body| {
// body is loaded, now we can deserialize json-rust
let result = json::parse(std::str::from_utf8(&body).unwrap()); // return Result
let injson: JsonValue = match result { Ok(v) => v, Err(e) => object!{"err" => e.to_string() } };
Ok(HttpResponse::build(StatusCode::OK)
.content_type("application/json")
.body(injson.dump()).unwrap())
})
.responder()
}
fn main() { fn main() {
::std::env::set_var("RUST_LOG", "actix_web=info"); ::std::env::set_var("RUST_LOG", "actix_web=info");
let _ = env_logger::init(); let _ = env_logger::init();
let sys = actix::System::new("json-example"); let sys = actix::System::new("json-example");
HttpServer::new(|| { let addr = HttpServer::new(|| {
Application::new() Application::new()
// enable logger // enable logger
.middleware(middleware::Logger::default()) .middleware(middleware::Logger::default())
.resource("/manual", |r| r.method(Method::POST).f(index_manual)) .resource("/manual", |r| r.method(Method::POST).f(index_manual))
.resource("/mjsonrust", |r| r.method(Method::POST).f(index_mjsonrust))
.resource("/", |r| r.method(Method::POST).f(index))}) .resource("/", |r| r.method(Method::POST).f(index))})
.bind("127.0.0.1:8080").unwrap() .bind("127.0.0.1:8080").unwrap()
.shutdown_timeout(1)
.start(); .start();
if cfg!(target_os = "linux") { // Subscribe to unix signals
let signals = Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber()));
}
println!("Started http server: 127.0.0.1:8080"); println!("Started http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();
} }

View File

@ -10,5 +10,5 @@ path = "src/main.rs"
[dependencies] [dependencies]
env_logger = "*" env_logger = "*"
futures = "0.1" futures = "0.1"
actix = "^0.3.4" actix = "^0.3.5"
actix-web = { git = "https://github.com/actix/actix-web.git" } actix-web = { git = "https://github.com/actix/actix-web", features=["signal"] }

View File

@ -0,0 +1,24 @@
# multipart
Multipart's `Getting Started` guide for Actix web
## Usage
### server
```bash
cd actix-web/examples/multipart
cargo run (or ``cargo watch -x run``)
# Started http server: 127.0.0.1:8080
```
### client
- ``pip install aiohttp``
- ``python client.py``
- you must see in server console multipart fields
if ubuntu :
- ``pip3 install aiohttp``
- ``python3 client.py``

View File

@ -6,6 +6,8 @@ extern crate futures;
use actix::*; use actix::*;
use actix_web::*; use actix_web::*;
#[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe};
use futures::{Future, Stream}; use futures::{Future, Stream};
use futures::future::{result, Either}; use futures::future::{result, Either};
@ -46,13 +48,18 @@ fn main() {
let _ = env_logger::init(); let _ = env_logger::init();
let sys = actix::System::new("multipart-example"); let sys = actix::System::new("multipart-example");
HttpServer::new( let addr = HttpServer::new(
|| Application::new() || Application::new()
.middleware(middleware::Logger::default()) // <- logger .middleware(middleware::Logger::default()) // <- logger
.resource("/multipart", |r| r.method(Method::POST).a(index))) .resource("/multipart", |r| r.method(Method::POST).a(index)))
.bind("127.0.0.1:8080").unwrap() .bind("127.0.0.1:8080").unwrap()
.start(); .start();
if cfg!(target_os = "linux") { // Subscribe to unix signals
let signals = Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber()));
}
println!("Starting http server: 127.0.0.1:8080"); println!("Starting http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();
} }

View File

@ -0,0 +1,14 @@
[package]
name = "signals"
version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
[[bin]]
name = "server"
path = "src/main.rs"
[dependencies]
env_logger = "*"
futures = "0.1"
actix = "^0.3.5"
actix-web = { git = "https://github.com/actix/actix-web.git", features=["signal"] }

View File

@ -0,0 +1,17 @@
# Signals
This example shows how to handle Unix signals and properly stop http server. This example does not work with Windows.
## Usage
```bash
cd actix-web/examples/signal
cargo run (or ``cargo watch -x run``)
# Started http server: 127.0.0.1:8080
# CTRL+C
# INFO:actix_web::server: SIGINT received, exiting
# INFO:actix_web::worker: Shutting down http worker, 0 connections
# INFO:actix_web::worker: Shutting down http worker, 0 connections
# INFO:actix_web::worker: Shutting down http worker, 0 connections
# INFO:actix_web::worker: Shutting down http worker, 0 connections
```

View File

@ -0,0 +1,44 @@
extern crate actix;
extern crate actix_web;
extern crate futures;
extern crate env_logger;
use actix::*;
use actix_web::*;
#[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe};
struct MyWebSocket;
impl Actor for MyWebSocket {
type Context = HttpContext<Self>;
}
impl StreamHandler<ws::Message> for MyWebSocket {}
impl Handler<ws::Message> for MyWebSocket {
fn handle(&mut self, _: ws::Message, _: &mut Self::Context) -> Response<Self, ws::Message> {
Self::empty()
}
}
fn main() {
::std::env::set_var("RUST_LOG", "actix_web=info");
let _ = env_logger::init();
let sys = actix::System::new("signals-example");
let addr = HttpServer::new(|| {
Application::new()
// enable logger
.middleware(middleware::Logger::default())
.resource("/ws/", |r| r.f(|req| ws::start(req, MyWebSocket)))
.resource("/", |r| r.h(httpcodes::HTTPOk))})
.bind("127.0.0.1:8080").unwrap()
.start();
if cfg!(target_os = "linux") { // Subscribe to unix signals
let signals = Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber()));
}
println!("Started http server: 127.0.0.1:8080");
let _ = sys.run();
}

10
examples/state/Cargo.toml Normal file
View File

@ -0,0 +1,10 @@
[package]
name = "state"
version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
[dependencies]
futures = "*"
env_logger = "0.4"
actix = "^0.3.5"
actix-web = { git = "https://github.com/actix/actix-web", features=["signal"] }

15
examples/state/README.md Normal file
View File

@ -0,0 +1,15 @@
# state
## Usage
### server
```bash
cd actix-web/examples/state
cargo run
# Started http server: 127.0.0.1:8080
```
### web client
- [http://localhost:8080/](http://localhost:8080/)

View File

@ -9,6 +9,7 @@ extern crate env_logger;
use actix::*; use actix::*;
use actix_web::*; use actix_web::*;
#[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe};
use std::cell::Cell; use std::cell::Cell;
struct AppState { struct AppState {
@ -60,7 +61,7 @@ fn main() {
let _ = env_logger::init(); let _ = env_logger::init();
let sys = actix::System::new("ws-example"); let sys = actix::System::new("ws-example");
HttpServer::new( let addr = HttpServer::new(
|| Application::with_state(AppState{counter: Cell::new(0)}) || Application::with_state(AppState{counter: Cell::new(0)})
// enable logger // enable logger
.middleware(middleware::Logger::default()) .middleware(middleware::Logger::default())
@ -73,6 +74,11 @@ fn main() {
.bind("127.0.0.1:8080").unwrap() .bind("127.0.0.1:8080").unwrap()
.start(); .start();
if cfg!(target_os = "linux") { // Subscribe to unix signals
let signals = Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber()));
}
println!("Started http server: 127.0.0.1:8080"); println!("Started http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();
} }

View File

@ -5,6 +5,6 @@ authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
[dependencies] [dependencies]
env_logger = "0.4" env_logger = "0.4"
actix = "^0.3.1" actix = "^0.3.5"
actix-web = { git = "https://github.com/actix/actix-web.git" } actix-web = { git = "https://github.com/actix/actix-web", features=["signal"] }
tera = "*" tera = "*"

View File

@ -0,0 +1,17 @@
# template_tera
Minimal example of using the template [tera](https://github.com/Keats/tera) that displays a form.
## Usage
### server
```bash
cd actix-web/examples/template_tera
cargo run (or ``cargo watch -x run``)
# Started http server: 127.0.0.1:8080
```
### web client
- [http://localhost:8080](http://localhost:8080)

View File

@ -3,7 +3,10 @@ extern crate actix_web;
extern crate env_logger; extern crate env_logger;
#[macro_use] #[macro_use]
extern crate tera; extern crate tera;
use actix::*;
use actix_web::*; use actix_web::*;
#[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe};
struct State { struct State {
template: tera::Tera, // <- store tera template in application state template: tera::Tera, // <- store tera template in application state
@ -30,7 +33,7 @@ fn main() {
let _ = env_logger::init(); let _ = env_logger::init();
let sys = actix::System::new("tera-example"); let sys = actix::System::new("tera-example");
HttpServer::new(|| { let addr = HttpServer::new(|| {
let tera = compile_templates!(concat!(env!("CARGO_MANIFEST_DIR"), "/templates/**/*")); let tera = compile_templates!(concat!(env!("CARGO_MANIFEST_DIR"), "/templates/**/*"));
Application::with_state(State{template: tera}) Application::with_state(State{template: tera})
@ -40,6 +43,11 @@ fn main() {
.bind("127.0.0.1:8080").unwrap() .bind("127.0.0.1:8080").unwrap()
.start(); .start();
if cfg!(target_os = "linux") { // Subscribe to unix signals
let signals = Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber()));
}
println!("Started http server: 127.0.0.1:8080"); println!("Started http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();
} }

View File

@ -9,5 +9,5 @@ path = "src/main.rs"
[dependencies] [dependencies]
env_logger = "0.4" env_logger = "0.4"
actix = "^0.3.1" actix = { version = "^0.3.5" }
actix-web = { git = "https://github.com/actix/actix-web.git", features=["alpn"] } actix-web = { git = "https://github.com/actix/actix-web", features=["signal", "alpn"] }

View File

@ -1,5 +1,16 @@
# tls example # tls example
To start server use command: `cargo run` ## Usage
Test command: `curl -v https://127.0.0.1:8080/index.html --compress -k` ### server
```bash
cd actix-web/examples/tls
cargo run (or ``cargo watch -x run``)
# Started http server: 127.0.0.1:8443
```
### web client
- curl: ``curl -v https://127.0.0.1:8443/index.html --compress -k``
- browser: [https://127.0.0.1:8443/index.html](https://127.0.0.1:8080/index.html)

View File

@ -6,7 +6,9 @@ extern crate env_logger;
use std::fs::File; use std::fs::File;
use std::io::Read; use std::io::Read;
use actix::*;
use actix_web::*; use actix_web::*;
#[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe};
/// somple handle /// somple handle
fn index(req: HttpRequest) -> Result<HttpResponse> { fn index(req: HttpRequest) -> Result<HttpResponse> {
@ -29,7 +31,7 @@ fn main() {
file.read_to_end(&mut pkcs12).unwrap(); file.read_to_end(&mut pkcs12).unwrap();
let pkcs12 = Pkcs12::from_der(&pkcs12).unwrap().parse("12345").unwrap(); let pkcs12 = Pkcs12::from_der(&pkcs12).unwrap().parse("12345").unwrap();
HttpServer::new( let addr = HttpServer::new(
|| Application::new() || Application::new()
// enable logger // enable logger
.middleware(middleware::Logger::default()) .middleware(middleware::Logger::default())
@ -45,6 +47,11 @@ fn main() {
.bind("127.0.0.1:8443").unwrap() .bind("127.0.0.1:8443").unwrap()
.start_ssl(&pkcs12).unwrap(); .start_ssl(&pkcs12).unwrap();
if cfg!(target_os = "linux") { // Subscribe to unix signals
let signals = Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber()));
}
println!("Started http server: 127.0.0.1:8443"); println!("Started http server: 127.0.0.1:8443");
let _ = sys.run(); let _ = sys.run();
} }

View File

@ -24,5 +24,5 @@ serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
actix = "^0.3.1" actix = { version = "^0.3.5" }
actix-web = { git = "https://github.com/actix/actix-web.git" } actix-web = { git = "https://github.com/actix/actix-web", features=["signal"] }

View File

@ -9,7 +9,6 @@ Added features:
* Chat server runs in separate thread * Chat server runs in separate thread
* Tcp listener runs in separate thread * Tcp listener runs in separate thread
## Server ## Server
Chat server listens for incoming tcp connections. Server can access several types of message: Chat server listens for incoming tcp connections. Server can access several types of message:
@ -18,8 +17,7 @@ Chat server listens for incoming tcp connections. Server can access several type
* `\join name` - join room, if room does not exist, create new one * `\join name` - join room, if room does not exist, create new one
* `\name name` - set session name * `\name name` - set session name
* `some message` - just string, send messsage to all peers in same room * `some message` - just string, send messsage to all peers in same room
* client has to send heartbeat `Ping` messages, if server does not receive a heartbeat * client has to send heartbeat `Ping` messages, if server does not receive a heartbeat message for 10 seconds connection gets droppped
message for 10 seconds connection gets droppped
To start server use command: `cargo run --bin server` To start server use command: `cargo run --bin server`
@ -29,7 +27,6 @@ Client connects to server. Reads input from stdin and sends to server.
To run client use command: `cargo run --bin client` To run client use command: `cargo run --bin client`
## WebSocket Browser Client ## WebSocket Browser Client
Open url: http://localhost:8080/ Open url: [http://localhost:8080/](http://localhost:8080/)

View File

@ -17,6 +17,7 @@ use std::time::Instant;
use actix::*; use actix::*;
use actix_web::*; use actix_web::*;
#[cfg(target_os = "linux")] use actix::actors::signal::{ProcessSignals, Subscribe};
mod codec; mod codec;
mod server; mod server;
@ -175,7 +176,6 @@ impl StreamHandler<ws::Message> for WsChatSession
} }
} }
fn main() { fn main() {
let _ = env_logger::init(); let _ = env_logger::init();
let sys = actix::System::new("websocket-example"); let sys = actix::System::new("websocket-example");
@ -192,9 +192,8 @@ fn main() {
Ok(()) Ok(())
})); }));
// Create Http server with websocket support // Create Http server with websocket support
HttpServer::new( let addr = HttpServer::new(
move || { move || {
// Websocket sessions state // Websocket sessions state
let state = WsChatSessionState { addr: server.clone() }; let state = WsChatSessionState { addr: server.clone() };
@ -216,5 +215,11 @@ fn main() {
.bind("127.0.0.1:8080").unwrap() .bind("127.0.0.1:8080").unwrap()
.start(); .start();
if cfg!(target_os = "linux") { // Subscribe to unix signals
let signals = Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber()));
}
println!("Started http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();
} }

View File

@ -31,4 +31,4 @@ cd actix-web
cargo run --example basic cargo run --example basic
``` ```
Check `examples/` directory for more examples. Check [examples/](https://github.com/actix/actix-web/tree/master/examples) directory for more examples.

View File

@ -10,7 +10,11 @@ Also it stores application specific state that is shared across all handlers
within same application. within same application.
Application acts as namespace for all routes, i.e all routes for specific application Application acts as namespace for all routes, i.e all routes for specific application
has same url path prefix: has same url path prefix. Application prefix always contains laading "/" slash.
If supplied prefix does not contain leading slash, it get inserted.
Prefix should consists of valud path segments. i.e for application with prefix `/app`
any request with following paths `/app`, `/app/` or `/app/test` would match,
but path `/application` would not match.
```rust,ignore ```rust,ignore
# extern crate actix_web; # extern crate actix_web;
@ -21,14 +25,14 @@ has same url path prefix:
# } # }
# fn main() { # fn main() {
let app = Application::new() let app = Application::new()
.prefix("/prefix") .prefix("/app")
.resource("/index.html", |r| r.method(Method::GET).f(index)) .resource("/index.html", |r| r.method(Method::GET).f(index))
.finish() .finish()
# } # }
``` ```
In this example application with `/prefix` prefix and `index.html` resource In this example application with `/app` prefix and `index.html` resource
get created. This resource is available as on `/prefix/index.html` url. get created. This resource is available as on `/app/index.html` url.
For more information check For more information check
[*URL Matching*](./qs_5.html#using-a-application-prefix-to-compose-applications) section. [*URL Matching*](./qs_5.html#using-a-application-prefix-to-compose-applications) section.
@ -56,6 +60,10 @@ fn main() {
``` ```
All `/app1` requests route to first application, `/app2` to second and then all other to third. All `/app1` requests route to first application, `/app2` to second and then all other to third.
Applications get matched based on registration order, if application with more general
prefix is registered before less generic, that would effectively block less generic
application to get matched. For example if *application* with prefix "/" get registered
as first application, it would match all incoming requests.
## State ## State

View File

@ -1,7 +1,7 @@
# Server # Server
[*HttpServer*](../actix_web/struct.HttpServer.html) type is responsible for [*HttpServer*](../actix_web/struct.HttpServer.html) type is responsible for
serving http requests. *HttpServer* accept applicaiton factory as a parameter, serving http requests. *HttpServer* accept application factory as a parameter,
Application factory must have `Send` + `Sync` bounderies. More about that in Application factory must have `Send` + `Sync` bounderies. More about that in
*multi-threading* section. To bind to specific socket address `bind()` must be used. *multi-threading* section. To bind to specific socket address `bind()` must be used.
This method could be called multiple times. To start http server one of the *start* This method could be called multiple times. To start http server one of the *start*
@ -55,7 +55,7 @@ fn main() {
.bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0") .bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0")
.spawn(); .spawn();
let _ = addr.call_fut(dev::StopServer).wait(); // <- Send `StopServer` message to server. let _ = addr.call_fut(dev::StopServer{graceful: true}).wait(); // <- Send `StopServer` message to server.
} }
``` ```
@ -164,3 +164,73 @@ fn index(req: HttpRequest) -> HttpResponse {
} }
# fn main() {} # fn main() {}
``` ```
## Graceful shutdown
Actix http server support graceful shutdown. After receiving a stop signal, workers
have specific amount of time to finish serving requests. Workers still alive after the
timeout are force dropped. By default shutdown timeout sets to 30 seconds.
You can change this parameter with `HttpServer::shutdown_timeout()` method.
You can send stop message to server with server address and specify if you what
graceful shutdown or not. `start()` or `spawn()` methods return address of the server.
```rust
# extern crate futures;
# extern crate actix;
# extern crate actix_web;
# use futures::Future;
use actix_web::*;
fn main() {
let addr = HttpServer::new(
|| Application::new()
.resource("/", |r| r.h(httpcodes::HTTPOk)))
.bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0")
.shutdown_timeout(60) // <- Set shutdown timeout to 60 seconds
.spawn();
let _ = addr.call_fut(
dev::StopServer{graceful:true}).wait(); // <- Send `StopServer` message to server.
}
```
It is possible to use unix signals on compatible OSs. "signal" feature needs to be enabled
in *Cargo.toml* for *actix-web* dependency.
```toml
[dependencies]
actix-web = { git = "https://github.com/actix/actix-web", features=["signal"] }
```
Then you can subscribe your server to unix signals. Http server handles three signals:
* *SIGINT* - Force shutdown workers
* *SIGTERM* - Graceful shutdown workers
* *SIGQUIT* - Force shutdown workers
```rust,ignore
# extern crate futures;
# extern crate actix;
# extern crate actix_web;
use actix_web::*;
use actix::actors::signal::{ProcessSignals, Subscribe};
fn main() {
let sys = actix::System::new("signals");
let addr = HttpServer::new(|| {
Application::new()
.resource("/", |r| r.h(httpcodes::HTTPOk))})
.bind("127.0.0.1:8080").unwrap()
.start();
// Subscribe to unix signals
let signals = Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber()));
println!("Started http server: 127.0.0.1:8080");
# actix::Arbiter::system().send(actix::msgs::SystemExit(0));
let _ = sys.run();
}
```

View File

@ -444,8 +444,7 @@ fn main() {
## Using a Application Prefix to Compose Applications ## Using a Application Prefix to Compose Applications
The `Applicaiton::prefix()`" method allows to set specific application prefix. The `Application::prefix()`" method allows to set specific application prefix.
If route_prefix is supplied to the include method, it must be a string.
This prefix represents a resource prefix that will be prepended to all resource patterns added This prefix represents a resource prefix that will be prepended to all resource patterns added
by the resource configuration. This can be used to help mount a set of routes at a different by the resource configuration. This can be used to help mount a set of routes at a different
location than the included callable's author intended while still maintaining the same location than the included callable's author intended while still maintaining the same
@ -471,7 +470,7 @@ fn main() {
In the above example, the *show_users* route will have an effective route pattern of In the above example, the *show_users* route will have an effective route pattern of
*/users/show* instead of */show* because the application's prefix argument will be prepended */users/show* instead of */show* because the application's prefix argument will be prepended
to the pattern. The route will then only match if the URL path is /users/show, to the pattern. The route will then only match if the URL path is */users/show*,
and when the `HttpRequest.url_for()` function is called with the route name show_users, and when the `HttpRequest.url_for()` function is called with the route name show_users,
it will generate a URL with that same path. it will generate a URL with that same path.

View File

@ -1,4 +1,5 @@
use std::rc::Rc; use std::rc::Rc;
use std::cell::RefCell;
use std::collections::HashMap; use std::collections::HashMap;
use handler::Reply; use handler::Reply;
@ -6,7 +7,7 @@ use router::{Router, Pattern};
use resource::Resource; use resource::Resource;
use httprequest::HttpRequest; use httprequest::HttpRequest;
use channel::{HttpHandler, IntoHttpHandler, HttpHandlerTask}; use channel::{HttpHandler, IntoHttpHandler, HttpHandlerTask};
use pipeline::Pipeline; use pipeline::{Pipeline, PipelineHandler};
use middleware::Middleware; use middleware::Middleware;
use server::ServerSettings; use server::ServerSettings;
@ -14,19 +15,20 @@ use server::ServerSettings;
pub struct HttpApplication<S=()> { pub struct HttpApplication<S=()> {
state: Rc<S>, state: Rc<S>,
prefix: String, prefix: String,
default: Resource<S>,
router: Router, router: Router,
resources: Vec<Resource<S>>, inner: Rc<RefCell<Inner<S>>>,
middlewares: Rc<Vec<Box<Middleware<S>>>>, middlewares: Rc<Vec<Box<Middleware<S>>>>,
} }
impl<S: 'static> HttpApplication<S> { pub(crate) struct Inner<S> {
default: Resource<S>,
pub(crate) fn prepare_request(&self, req: HttpRequest) -> HttpRequest<S> { router: Router,
req.with_state(Rc::clone(&self.state), self.router.clone()) resources: Vec<Resource<S>>,
} }
pub(crate) fn run(&mut self, mut req: HttpRequest<S>) -> Reply { impl<S: 'static> PipelineHandler<S> for Inner<S> {
fn handle(&mut self, mut req: HttpRequest<S>) -> Reply {
if let Some(idx) = self.router.recognize(&mut req) { if let Some(idx) = self.router.recognize(&mut req) {
self.resources[idx].handle(req.clone(), Some(&mut self.default)) self.resources[idx].handle(req.clone(), Some(&mut self.default))
} else { } else {
@ -35,27 +37,40 @@ impl<S: 'static> HttpApplication<S> {
} }
} }
#[cfg(test)]
impl<S: 'static> HttpApplication<S> {
pub(crate) fn run(&mut self, req: HttpRequest<S>) -> Reply {
self.inner.borrow_mut().handle(req)
}
pub(crate) fn prepare_request(&self, req: HttpRequest) -> HttpRequest<S> {
req.with_state(Rc::clone(&self.state), self.router.clone())
}
}
impl<S: 'static> HttpHandler for HttpApplication<S> { impl<S: 'static> HttpHandler for HttpApplication<S> {
fn handle(&mut self, req: HttpRequest) -> Result<Box<HttpHandlerTask>, HttpRequest> { fn handle(&mut self, req: HttpRequest) -> Result<Box<HttpHandlerTask>, HttpRequest> {
if req.path().starts_with(&self.prefix) { let m = {
let req = self.prepare_request(req); let path = req.path();
// TODO: redesign run callback path.starts_with(&self.prefix) && (
Ok(Box::new(Pipeline::new(req, Rc::clone(&self.middlewares), path.len() == self.prefix.len() ||
&mut |req: HttpRequest<S>| self.run(req)))) path.split_at(self.prefix.len()).1.starts_with('/'))
};
if m {
let inner = Rc::clone(&self.inner);
let req = req.with_state(Rc::clone(&self.state), self.router.clone());
Ok(Box::new(Pipeline::new(req, Rc::clone(&self.middlewares), inner)))
} else { } else {
Err(req) Err(req)
} }
} }
fn server_settings(&mut self, settings: ServerSettings) {
self.router.set_server_settings(settings);
}
} }
struct ApplicationParts<S> { struct ApplicationParts<S> {
state: S, state: S,
prefix: String, prefix: String,
settings: ServerSettings,
default: Resource<S>, default: Resource<S>,
resources: HashMap<Pattern, Option<Resource<S>>>, resources: HashMap<Pattern, Option<Resource<S>>>,
external: HashMap<String, Pattern>, external: HashMap<String, Pattern>,
@ -76,6 +91,7 @@ impl Application<()> {
parts: Some(ApplicationParts { parts: Some(ApplicationParts {
state: (), state: (),
prefix: "/".to_owned(), prefix: "/".to_owned(),
settings: ServerSettings::default(),
default: Resource::default_not_found(), default: Resource::default_not_found(),
resources: HashMap::new(), resources: HashMap::new(),
external: HashMap::new(), external: HashMap::new(),
@ -103,6 +119,7 @@ impl<S> Application<S> where S: 'static {
parts: Some(ApplicationParts { parts: Some(ApplicationParts {
state: state, state: state,
prefix: "/".to_owned(), prefix: "/".to_owned(),
settings: ServerSettings::default(),
default: Resource::default_not_found(), default: Resource::default_not_found(),
resources: HashMap::new(), resources: HashMap::new(),
external: HashMap::new(), external: HashMap::new(),
@ -115,11 +132,14 @@ impl<S> Application<S> where S: 'static {
/// ///
/// Only requests that matches application's prefix get processed by this application. /// Only requests that matches application's prefix get processed by this application.
/// Application prefix always contains laading "/" slash. If supplied prefix /// Application prefix always contains laading "/" slash. If supplied prefix
/// does not contain leading slash, it get inserted. /// does not contain leading slash, it get inserted. Prefix should
/// consists of valud path segments. i.e for application with
/// prefix `/app` any request with following paths `/app`, `/app/` or `/app/test`
/// would match, but path `/application` would not match.
/// ///
/// In the following example only requests with "/app/" path prefix /// In the following example only requests with "/app/" path prefix
/// get handled. Request with path "/app/test/" will be handled, /// get handled. Request with path "/app/test/" would be handled,
/// but request with path "/other/..." will return *NOT FOUND* /// but request with path "/application" or "/other/..." would return *NOT FOUND*
/// ///
/// ```rust /// ```rust
/// # extern crate actix_web; /// # extern crate actix_web;
@ -266,13 +286,20 @@ impl<S> Application<S> where S: 'static {
resources.insert(pattern, None); resources.insert(pattern, None);
} }
let (router, resources) = Router::new(prefix, resources); let (router, resources) = Router::new(prefix, parts.settings, resources);
let inner = Rc::new(RefCell::new(
Inner {
default: parts.default,
router: router.clone(),
resources: resources }
));
HttpApplication { HttpApplication {
state: Rc::new(parts.state), state: Rc::new(parts.state),
prefix: prefix.to_owned(), prefix: prefix.to_owned(),
default: parts.default, inner: inner,
router: router, router: router.clone(),
resources: resources,
middlewares: Rc::new(parts.middlewares), middlewares: Rc::new(parts.middlewares),
} }
} }
@ -281,7 +308,11 @@ impl<S> Application<S> where S: 'static {
impl<S: 'static> IntoHttpHandler for Application<S> { impl<S: 'static> IntoHttpHandler for Application<S> {
type Handler = HttpApplication<S>; type Handler = HttpApplication<S>;
fn into_handler(mut self) -> HttpApplication<S> { fn into_handler(mut self, settings: ServerSettings) -> HttpApplication<S> {
{
let parts = self.parts.as_mut().expect("Use after finish");
parts.settings = settings;
}
self.finish() self.finish()
} }
} }
@ -289,7 +320,11 @@ impl<S: 'static> IntoHttpHandler for Application<S> {
impl<'a, S: 'static> IntoHttpHandler for &'a mut Application<S> { impl<'a, S: 'static> IntoHttpHandler for &'a mut Application<S> {
type Handler = HttpApplication<S>; type Handler = HttpApplication<S>;
fn into_handler(self) -> HttpApplication<S> { fn into_handler(self, settings: ServerSettings) -> HttpApplication<S> {
{
let parts = self.parts.as_mut().expect("Use after finish");
parts.settings = settings;
}
self.finish() self.finish()
} }
} }
@ -361,4 +396,23 @@ mod tests {
let resp = app.run(req); let resp = app.run(req);
assert_eq!(resp.as_response().unwrap().status(), StatusCode::OK); assert_eq!(resp.as_response().unwrap().status(), StatusCode::OK);
} }
#[test]
fn test_prefix() {
let mut app = Application::new()
.prefix("/test")
.resource("/blah", |r| r.h(httpcodes::HTTPOk))
.finish();
let req = TestRequest::with_uri("/test").finish();
let resp = app.handle(req);
assert!(resp.is_ok());
let req = TestRequest::with_uri("/test/").finish();
let resp = app.handle(req);
assert!(resp.is_ok());
let req = TestRequest::with_uri("/testing").finish();
let resp = app.handle(req);
assert!(resp.is_err());
}
} }

View File

@ -1,17 +1,16 @@
use std::rc::Rc; use std::rc::Rc;
use std::net::SocketAddr; use std::net::SocketAddr;
use actix::dev::*;
use bytes::Bytes; use bytes::Bytes;
use futures::{Future, Poll, Async}; use futures::{Future, Poll, Async};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use h1; use {h1, h2};
use h2;
use error::Error; use error::Error;
use h1writer::Writer; use h1writer::Writer;
use httprequest::HttpRequest; use httprequest::HttpRequest;
use server::{ServerSettings, WorkerSettings}; use server::ServerSettings;
use worker::WorkerSettings;
/// Low level http request handler /// Low level http request handler
#[allow(unused_variables)] #[allow(unused_variables)]
@ -19,9 +18,6 @@ pub trait HttpHandler: 'static {
/// Handle request /// Handle request
fn handle(&mut self, req: HttpRequest) -> Result<Box<HttpHandlerTask>, HttpRequest>; fn handle(&mut self, req: HttpRequest) -> Result<Box<HttpHandlerTask>, HttpRequest>;
/// Set server settings
fn server_settings(&mut self, settings: ServerSettings) {}
} }
pub trait HttpHandlerTask { pub trait HttpHandlerTask {
@ -39,13 +35,13 @@ pub trait IntoHttpHandler {
type Handler: HttpHandler; type Handler: HttpHandler;
/// Convert into `HttpHandler` object. /// Convert into `HttpHandler` object.
fn into_handler(self) -> Self::Handler; fn into_handler(self, settings: ServerSettings) -> Self::Handler;
} }
impl<T: HttpHandler> IntoHttpHandler for T { impl<T: HttpHandler> IntoHttpHandler for T {
type Handler = T; type Handler = T;
fn into_handler(self) -> Self::Handler { fn into_handler(self, _: ServerSettings) -> Self::Handler {
self self
} }
} }
@ -70,6 +66,7 @@ impl<T, H> HttpChannel<T, H>
pub(crate) fn new(h: Rc<WorkerSettings<H>>, pub(crate) fn new(h: Rc<WorkerSettings<H>>,
io: T, peer: Option<SocketAddr>, http2: bool) -> HttpChannel<T, H> io: T, peer: Option<SocketAddr>, http2: bool) -> HttpChannel<T, H>
{ {
h.add_channel();
if http2 { if http2 {
HttpChannel { HttpChannel {
proto: Some(HttpProtocol::H2( proto: Some(HttpProtocol::H2(
@ -88,12 +85,6 @@ impl<T, H> HttpChannel<T, H>
} }
}*/ }*/
impl<T, H> Actor for HttpChannel<T, H>
where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static
{
type Context = Context<Self>;
}
impl<T, H> Future for HttpChannel<T, H> impl<T, H> Future for HttpChannel<T, H>
where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static
{ {
@ -104,16 +95,27 @@ impl<T, H> Future for HttpChannel<T, H>
match self.proto { match self.proto {
Some(HttpProtocol::H1(ref mut h1)) => { Some(HttpProtocol::H1(ref mut h1)) => {
match h1.poll() { match h1.poll() {
Ok(Async::Ready(h1::Http1Result::Done)) => Ok(Async::Ready(h1::Http1Result::Done)) => {
return Ok(Async::Ready(())), h1.settings().remove_channel();
return Ok(Async::Ready(()))
}
Ok(Async::Ready(h1::Http1Result::Switch)) => (), Ok(Async::Ready(h1::Http1Result::Switch)) => (),
Ok(Async::NotReady) => Ok(Async::NotReady) =>
return Ok(Async::NotReady), return Ok(Async::NotReady),
Err(_) => Err(_) => {
return Err(()), h1.settings().remove_channel();
return Err(())
} }
} }
Some(HttpProtocol::H2(ref mut h2)) => return h2.poll(), }
Some(HttpProtocol::H2(ref mut h2)) => {
let result = h2.poll();
match result {
Ok(Async::Ready(())) | Err(_) => h2.settings().remove_channel(),
_ => (),
}
return result
}
None => unreachable!(), None => unreachable!(),
} }

View File

@ -17,7 +17,7 @@ use pipeline::Pipeline;
use encoding::PayloadType; use encoding::PayloadType;
use channel::{HttpHandler, HttpHandlerTask}; use channel::{HttpHandler, HttpHandlerTask};
use h1writer::{Writer, H1Writer}; use h1writer::{Writer, H1Writer};
use server::WorkerSettings; use worker::WorkerSettings;
use httpcodes::HTTPNotFound; use httpcodes::HTTPNotFound;
use httprequest::HttpRequest; use httprequest::HttpRequest;
use error::{ParseError, PayloadError, ResponseError}; use error::{ParseError, PayloadError, ResponseError};
@ -89,6 +89,10 @@ impl<T, H> Http1<T, H>
keepalive_timer: None } keepalive_timer: None }
} }
pub fn settings(&self) -> &WorkerSettings<H> {
self.settings.as_ref()
}
pub fn into_inner(self) -> (Rc<WorkerSettings<H>>, T, Option<SocketAddr>, Bytes) { pub fn into_inner(self) -> (Rc<WorkerSettings<H>>, T, Option<SocketAddr>, Bytes) {
(self.settings, self.stream.into_inner(), self.addr, self.read_buf.freeze()) (self.settings, self.stream.into_inner(), self.addr, self.read_buf.freeze())
} }
@ -888,7 +892,7 @@ mod tests {
use http::{Version, Method}; use http::{Version, Method};
use super::*; use super::*;
use application::HttpApplication; use application::HttpApplication;
use server::WorkerSettings; use worker::WorkerSettings;
struct Buffer { struct Buffer {
buf: Bytes, buf: Bytes,

View File

@ -16,7 +16,7 @@ use tokio_core::reactor::Timeout;
use pipeline::Pipeline; use pipeline::Pipeline;
use h2writer::H2Writer; use h2writer::H2Writer;
use server::WorkerSettings; use worker::WorkerSettings;
use channel::{HttpHandler, HttpHandlerTask}; use channel::{HttpHandler, HttpHandlerTask};
use error::PayloadError; use error::PayloadError;
use encoding::PayloadType; use encoding::PayloadType;
@ -64,6 +64,10 @@ impl<T, H> Http2<T, H>
} }
} }
pub fn settings(&self) -> &WorkerSettings<H> {
self.settings.as_ref()
}
pub fn poll(&mut self) -> Poll<(), ()> { pub fn poll(&mut self) -> Poll<(), ()> {
// server // server
if let State::Server(ref mut server) = self.state { if let State::Server(ref mut server) = self.state {

View File

@ -644,6 +644,7 @@ mod tests {
use router::Pattern; use router::Pattern;
use resource::Resource; use resource::Resource;
use test::TestRequest; use test::TestRequest;
use server::ServerSettings;
#[test] #[test]
fn test_debug() { fn test_debug() {
@ -720,7 +721,7 @@ mod tests {
resource.name("index"); resource.name("index");
let mut map = HashMap::new(); let mut map = HashMap::new();
map.insert(Pattern::new("index", "/{key}/"), Some(resource)); map.insert(Pattern::new("index", "/{key}/"), Some(resource));
let (router, _) = Router::new("", map); let (router, _) = Router::new("", ServerSettings::default(), map);
assert!(router.recognize(&mut req).is_some()); assert!(router.recognize(&mut req).is_some());
assert_eq!(req.match_info().get("key"), Some("value")); assert_eq!(req.match_info().get("key"), Some("value"));
@ -822,7 +823,7 @@ mod tests {
resource.name("index"); resource.name("index");
let mut map = HashMap::new(); let mut map = HashMap::new();
map.insert(Pattern::new("index", "/user/{name}.{ext}"), Some(resource)); map.insert(Pattern::new("index", "/user/{name}.{ext}"), Some(resource));
let (router, _) = Router::new("", map); let (router, _) = Router::new("/", ServerSettings::default(), map);
assert!(router.has_route("/user/test.html")); assert!(router.has_route("/user/test.html"));
assert!(!router.has_route("/test/unknown")); assert!(!router.has_route("/test/unknown"));
@ -839,6 +840,27 @@ mod tests {
assert_eq!(url.ok().unwrap().as_str(), "http://www.rust-lang.org/user/test.html"); assert_eq!(url.ok().unwrap().as_str(), "http://www.rust-lang.org/user/test.html");
} }
#[test]
fn test_url_for_with_prefix() {
let mut headers = HeaderMap::new();
headers.insert(header::HOST,
header::HeaderValue::from_static("www.rust-lang.org"));
let req = HttpRequest::new(
Method::GET, Uri::from_str("/").unwrap(), Version::HTTP_11, headers, None);
let mut resource = Resource::<()>::default();
resource.name("index");
let mut map = HashMap::new();
map.insert(Pattern::new("index", "/user/{name}.{ext}"), Some(resource));
let (router, _) = Router::new("/prefix/", ServerSettings::default(), map);
assert!(router.has_route("/user/test.html"));
assert!(!router.has_route("/prefix/user/test.html"));
let req = req.with_state(Rc::new(()), router);
let url = req.url_for("index", &["test", "html"]);
assert_eq!(url.ok().unwrap().as_str(), "http://www.rust-lang.org/prefix/user/test.html");
}
#[test] #[test]
fn test_url_for_external() { fn test_url_for_external() {
let req = HttpRequest::new( let req = HttpRequest::new(
@ -849,7 +871,7 @@ mod tests {
resource.name("index"); resource.name("index");
let mut map = HashMap::new(); let mut map = HashMap::new();
map.insert(Pattern::new("youtube", "https://youtube.com/watch/{video_id}"), None); map.insert(Pattern::new("youtube", "https://youtube.com/watch/{video_id}"), None);
let (router, _) = Router::new::<()>("", map); let (router, _) = Router::new::<()>("", ServerSettings::default(), map);
assert!(!router.has_route("https://youtube.com/watch/unknown")); assert!(!router.has_route("https://youtube.com/watch/unknown"));
let req = req.with_state(Rc::new(()), router); let req = req.with_state(Rc::new(()), router);

View File

@ -32,6 +32,7 @@
//! * Configurable request routing //! * Configurable request routing
//! * Multipart streams //! * Multipart streams
//! * Middlewares (`Logger`, `Session`, `DefaultHeaders`) //! * Middlewares (`Logger`, `Session`, `DefaultHeaders`)
//! * Graceful server shutdown
//! * Built on top of [Actix](https://github.com/actix/actix). //! * Built on top of [Actix](https://github.com/actix/actix).
#![cfg_attr(actix_nightly, feature( #![cfg_attr(actix_nightly, feature(
@ -47,15 +48,13 @@ extern crate regex;
#[macro_use] #[macro_use]
extern crate bitflags; extern crate bitflags;
#[macro_use] #[macro_use]
extern crate failure;
#[macro_use]
extern crate futures; extern crate futures;
extern crate tokio_io; extern crate tokio_io;
extern crate tokio_core; extern crate tokio_core;
extern crate mio; extern crate mio;
extern crate net2; extern crate net2;
extern crate failure;
#[macro_use] extern crate failure_derive;
extern crate cookie; extern crate cookie;
extern crate http; extern crate http;
extern crate httparse; extern crate httparse;
@ -103,6 +102,7 @@ mod resource;
mod handler; mod handler;
mod pipeline; mod pipeline;
mod server; mod server;
mod worker;
mod channel; mod channel;
mod wsframe; mod wsframe;
mod wsproto; mod wsproto;
@ -170,7 +170,6 @@ pub mod dev {
pub use handler::Handler; pub use handler::Handler;
pub use json::JsonBody; pub use json::JsonBody;
pub use router::{Router, Pattern}; pub use router::{Router, Pattern};
pub use pipeline::Pipeline;
pub use channel::{HttpChannel, HttpHandler, IntoHttpHandler}; pub use channel::{HttpChannel, HttpHandler, IntoHttpHandler};
pub use param::{FromParam, Params}; pub use param::{FromParam, Params};
pub use httprequest::UrlEncoded; pub use httprequest::UrlEncoded;

View File

@ -41,7 +41,7 @@ pub trait RequestSession {
fn session(&mut self) -> Session; fn session(&mut self) -> Session;
} }
impl RequestSession for HttpRequest { impl<S> RequestSession for HttpRequest<S> {
fn session(&mut self) -> Session { fn session(&mut self) -> Session {
if let Some(s_impl) = self.extensions().get_mut::<Arc<SessionImplBox>>() { if let Some(s_impl) = self.extensions().get_mut::<Arc<SessionImplBox>>() {
@ -276,7 +276,7 @@ impl CookieSessionInner {
fn new(key: &[u8]) -> CookieSessionInner { fn new(key: &[u8]) -> CookieSessionInner {
CookieSessionInner { CookieSessionInner {
key: Key::from_master(key), key: Key::from_master(key),
name: "actix_session".to_owned(), name: "actix-session".to_owned(),
path: "/".to_owned(), path: "/".to_owned(),
domain: None, domain: None,
secure: true } secure: true }

View File

@ -1,5 +1,6 @@
use std::{io, mem}; use std::{io, mem};
use std::rc::Rc; use std::rc::Rc;
use std::cell::RefCell;
use std::marker::PhantomData; use std::marker::PhantomData;
use futures::{Async, Poll, Future, Stream}; use futures::{Async, Poll, Future, Stream};
@ -14,21 +15,23 @@ use h1writer::{Writer, WriterState};
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use middleware::{Middleware, Finished, Started, Response}; use middleware::{Middleware, Finished, Started, Response};
use application::Inner;
type Handler<S> = FnMut(HttpRequest<S>) -> Reply; pub(crate) trait PipelineHandler<S> {
pub(crate) type PipelineHandler<'a, S> = &'a mut FnMut(HttpRequest<S>) -> Reply; fn handle(&mut self, req: HttpRequest<S>) -> Reply;
}
pub struct Pipeline<S>(PipelineInfo<S>, PipelineState<S>); pub(crate) struct Pipeline<S, H>(PipelineInfo<S>, PipelineState<S, H>);
enum PipelineState<S> { enum PipelineState<S, H> {
None, None,
Error, Error,
Starting(StartMiddlewares<S>), Starting(StartMiddlewares<S, H>),
Handler(WaitingResponse<S>), Handler(WaitingResponse<S, H>),
RunMiddlewares(RunMiddlewares<S>), RunMiddlewares(RunMiddlewares<S, H>),
Response(ProcessResponse<S>), Response(ProcessResponse<S, H>),
Finishing(FinishingMiddlewares<S>), Finishing(FinishingMiddlewares<S, H>),
Completed(Completed<S>), Completed(Completed<S, H>),
} }
struct PipelineInfo<S> { struct PipelineInfo<S> {
@ -75,11 +78,11 @@ enum PipelineResponse {
Response(Box<Future<Item=HttpResponse, Error=Error>>), Response(Box<Future<Item=HttpResponse, Error=Error>>),
} }
impl<S> Pipeline<S> { impl<S, H: PipelineHandler<S>> Pipeline<S, H> {
pub fn new(req: HttpRequest<S>, pub fn new(req: HttpRequest<S>,
mws: Rc<Vec<Box<Middleware<S>>>>, mws: Rc<Vec<Box<Middleware<S>>>>,
handler: PipelineHandler<S>) -> Pipeline<S> handler: Rc<RefCell<H>>) -> Pipeline<S, H>
{ {
let mut info = PipelineInfo { let mut info = PipelineInfo {
req: req, req: req,
@ -94,15 +97,14 @@ impl<S> Pipeline<S> {
} }
} }
impl Pipeline<()> { impl Pipeline<(), Inner<()>> {
pub fn error<R: Into<HttpResponse>>(err: R) -> Box<HttpHandlerTask> { pub fn error<R: Into<HttpResponse>>(err: R) -> Box<HttpHandlerTask> {
Box::new(Pipeline( Box::new(Pipeline::<(), Inner<()>>(
PipelineInfo::new( PipelineInfo::new(HttpRequest::default()), ProcessResponse::init(err.into())))
HttpRequest::default()), ProcessResponse::init(err.into())))
} }
} }
impl<S> Pipeline<S> { impl<S, H> Pipeline<S, H> {
fn is_done(&self) -> bool { fn is_done(&self) -> bool {
match self.1 { match self.1 {
@ -115,7 +117,7 @@ impl<S> Pipeline<S> {
} }
} }
impl<S> HttpHandlerTask for Pipeline<S> { impl<S, H: PipelineHandler<S>> HttpHandlerTask for Pipeline<S, H> {
fn disconnected(&mut self) { fn disconnected(&mut self) {
if let Some(ref mut context) = self.0.context { if let Some(ref mut context) = self.0.context {
@ -274,20 +276,22 @@ impl<S> HttpHandlerTask for Pipeline<S> {
type Fut = Box<Future<Item=Option<HttpResponse>, Error=Error>>; type Fut = Box<Future<Item=Option<HttpResponse>, Error=Error>>;
/// Middlewares start executor /// Middlewares start executor
struct StartMiddlewares<S> { struct StartMiddlewares<S, H> {
hnd: *mut Handler<S>, hnd: Rc<RefCell<H>>,
fut: Option<Fut>, fut: Option<Fut>,
_s: PhantomData<S>,
} }
impl<S> StartMiddlewares<S> { impl<S, H: PipelineHandler<S>> StartMiddlewares<S, H> {
fn init(info: &mut PipelineInfo<S>, handler: PipelineHandler<S>) -> PipelineState<S> { fn init(info: &mut PipelineInfo<S>, handler: Rc<RefCell<H>>) -> PipelineState<S, H>
{
// execute middlewares, we need this stage because middlewares could be non-async // execute middlewares, we need this stage because middlewares could be non-async
// and we can move to next state immidietly // and we can move to next state immidietly
let len = info.mws.len(); let len = info.mws.len();
loop { loop {
if info.count == len { if info.count == len {
let reply = (&mut *handler)(info.req.clone()); let reply = handler.borrow_mut().handle(info.req.clone());
return WaitingResponse::init(info, reply) return WaitingResponse::init(info, reply)
} else { } else {
match info.mws[info.count].start(&mut info.req) { match info.mws[info.count].start(&mut info.req) {
@ -299,8 +303,9 @@ impl<S> StartMiddlewares<S> {
match fut.poll() { match fut.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady) =>
return PipelineState::Starting(StartMiddlewares { return PipelineState::Starting(StartMiddlewares {
hnd: handler as *const _ as *mut _, hnd: handler,
fut: Some(fut)}), fut: Some(fut),
_s: PhantomData}),
Ok(Async::Ready(resp)) => { Ok(Async::Ready(resp)) => {
if let Some(resp) = resp { if let Some(resp) = resp {
return RunMiddlewares::init(info, resp); return RunMiddlewares::init(info, resp);
@ -317,7 +322,8 @@ impl<S> StartMiddlewares<S> {
} }
} }
fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S>, PipelineState<S>> { fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S, H>, PipelineState<S, H>>
{
let len = info.mws.len(); let len = info.mws.len();
'outer: loop { 'outer: loop {
match self.fut.as_mut().unwrap().poll() { match self.fut.as_mut().unwrap().poll() {
@ -329,7 +335,7 @@ impl<S> StartMiddlewares<S> {
return Ok(RunMiddlewares::init(info, resp)); return Ok(RunMiddlewares::init(info, resp));
} }
if info.count == len { if info.count == len {
let reply = (unsafe{&mut *self.hnd})(info.req.clone()); let reply = (*self.hnd.borrow_mut()).handle(info.req.clone());
return Ok(WaitingResponse::init(info, reply)); return Ok(WaitingResponse::init(info, reply));
} else { } else {
loop { loop {
@ -357,29 +363,33 @@ impl<S> StartMiddlewares<S> {
} }
// waiting for response // waiting for response
struct WaitingResponse<S> { struct WaitingResponse<S, H> {
stream: PipelineResponse, stream: PipelineResponse,
_s: PhantomData<S>, _s: PhantomData<S>,
_h: PhantomData<H>,
} }
impl<S> WaitingResponse<S> { impl<S, H> WaitingResponse<S, H> {
#[inline] #[inline]
fn init(info: &mut PipelineInfo<S>, reply: Reply) -> PipelineState<S> fn init(info: &mut PipelineInfo<S>, reply: Reply) -> PipelineState<S, H>
{ {
match reply.into() { match reply.into() {
ReplyItem::Message(resp) => ReplyItem::Message(resp) =>
RunMiddlewares::init(info, resp), RunMiddlewares::init(info, resp),
ReplyItem::Actor(ctx) => ReplyItem::Actor(ctx) =>
PipelineState::Handler( PipelineState::Handler(
WaitingResponse { stream: PipelineResponse::Context(ctx), _s: PhantomData }), WaitingResponse { stream: PipelineResponse::Context(ctx),
_s: PhantomData, _h: PhantomData }),
ReplyItem::Future(fut) => ReplyItem::Future(fut) =>
PipelineState::Handler( PipelineState::Handler(
WaitingResponse { stream: PipelineResponse::Response(fut), _s: PhantomData }), WaitingResponse { stream: PipelineResponse::Response(fut),
_s: PhantomData, _h: PhantomData }),
} }
} }
fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S>, PipelineState<S>> { fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S, H>, PipelineState<S, H>>
{
let stream = mem::replace(&mut self.stream, PipelineResponse::None); let stream = mem::replace(&mut self.stream, PipelineResponse::None);
match stream { match stream {
@ -430,15 +440,16 @@ impl<S> WaitingResponse<S> {
} }
/// Middlewares response executor /// Middlewares response executor
struct RunMiddlewares<S> { struct RunMiddlewares<S, H> {
curr: usize, curr: usize,
fut: Option<Box<Future<Item=HttpResponse, Error=Error>>>, fut: Option<Box<Future<Item=HttpResponse, Error=Error>>>,
_s: PhantomData<S>, _s: PhantomData<S>,
_h: PhantomData<H>,
} }
impl<S> RunMiddlewares<S> { impl<S, H> RunMiddlewares<S, H> {
fn init(info: &mut PipelineInfo<S>, mut resp: HttpResponse) -> PipelineState<S> fn init(info: &mut PipelineInfo<S>, mut resp: HttpResponse) -> PipelineState<S, H>
{ {
if info.count == 0 { if info.count == 0 {
return ProcessResponse::init(resp); return ProcessResponse::init(resp);
@ -462,20 +473,23 @@ impl<S> RunMiddlewares<S> {
}, },
Response::Future(fut) => { Response::Future(fut) => {
return PipelineState::RunMiddlewares( return PipelineState::RunMiddlewares(
RunMiddlewares { curr: curr, fut: Some(fut), _s: PhantomData }) RunMiddlewares { curr: curr, fut: Some(fut),
_s: PhantomData, _h: PhantomData })
}, },
}; };
} }
} }
fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S>, PipelineState<S>> { fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S,H>, PipelineState<S, H>>
{
let len = info.mws.len(); let len = info.mws.len();
loop { loop {
// poll latest fut // poll latest fut
let mut resp = match self.fut.as_mut().unwrap().poll() { let mut resp = match self.fut.as_mut().unwrap().poll() {
Ok(Async::NotReady) => Ok(Async::NotReady) => {
return Ok(PipelineState::RunMiddlewares(self)), return Err(PipelineState::RunMiddlewares(self))
}
Ok(Async::Ready(resp)) => { Ok(Async::Ready(resp)) => {
self.curr += 1; self.curr += 1;
resp resp
@ -506,12 +520,13 @@ impl<S> RunMiddlewares<S> {
} }
} }
struct ProcessResponse<S> { struct ProcessResponse<S, H> {
resp: HttpResponse, resp: HttpResponse,
iostate: IOState, iostate: IOState,
running: RunningState, running: RunningState,
drain: Option<oneshot::Sender<()>>, drain: Option<oneshot::Sender<()>>,
_s: PhantomData<S>, _s: PhantomData<S>,
_h: PhantomData<H>,
} }
#[derive(PartialEq)] #[derive(PartialEq)]
@ -543,21 +558,21 @@ enum IOState {
Done, Done,
} }
impl<S> ProcessResponse<S> { impl<S, H> ProcessResponse<S, H> {
#[inline] #[inline]
fn init(resp: HttpResponse) -> PipelineState<S> fn init(resp: HttpResponse) -> PipelineState<S, H>
{ {
PipelineState::Response( PipelineState::Response(
ProcessResponse{ resp: resp, ProcessResponse{ resp: resp,
iostate: IOState::Response, iostate: IOState::Response,
running: RunningState::Running, running: RunningState::Running,
drain: None, drain: None,
_s: PhantomData}) _s: PhantomData, _h: PhantomData})
} }
fn poll_io(mut self, io: &mut Writer, info: &mut PipelineInfo<S>) fn poll_io(mut self, io: &mut Writer, info: &mut PipelineInfo<S>)
-> Result<PipelineState<S>, PipelineState<S>> -> Result<PipelineState<S, H>, PipelineState<S, H>>
{ {
if self.drain.is_none() && self.running != RunningState::Paused { if self.drain.is_none() && self.running != RunningState::Paused {
// if task is paused, write buffer is probably full // if task is paused, write buffer is probably full
@ -696,8 +711,16 @@ impl<S> ProcessResponse<S> {
// flush io but only if we need to // flush io but only if we need to
if self.running == RunningState::Paused || self.drain.is_some() { if self.running == RunningState::Paused || self.drain.is_some() {
match io.poll_completed() { match io.poll_completed() {
Ok(Async::Ready(_)) => Ok(Async::Ready(_)) => {
self.running.resume(), self.running.resume();
// resolve drain futures
if let Some(tx) = self.drain.take() {
let _ = tx.send(());
}
// restart io processing
return self.poll_io(io, info);
},
Ok(Async::NotReady) => Ok(Async::NotReady) =>
return Err(PipelineState::Response(self)), return Err(PipelineState::Response(self)),
Err(err) => { Err(err) => {
@ -708,11 +731,6 @@ impl<S> ProcessResponse<S> {
} }
} }
// drain futures
if let Some(tx) = self.drain.take() {
let _ = tx.send(());
}
// response is completed // response is completed
match self.iostate { match self.iostate {
IOState::Done => { IOState::Done => {
@ -725,25 +743,28 @@ impl<S> ProcessResponse<S> {
} }
/// Middlewares start executor /// Middlewares start executor
struct FinishingMiddlewares<S> { struct FinishingMiddlewares<S, H> {
resp: HttpResponse, resp: HttpResponse,
fut: Option<Box<Future<Item=(), Error=Error>>>, fut: Option<Box<Future<Item=(), Error=Error>>>,
_s: PhantomData<S>, _s: PhantomData<S>,
_h: PhantomData<H>,
} }
impl<S> FinishingMiddlewares<S> { impl<S, H> FinishingMiddlewares<S, H> {
fn init(info: &mut PipelineInfo<S>, resp: HttpResponse) -> PipelineState<S> { fn init(info: &mut PipelineInfo<S>, resp: HttpResponse) -> PipelineState<S, H> {
if info.count == 0 { if info.count == 0 {
Completed::init(info) Completed::init(info)
} else { } else {
match (FinishingMiddlewares{resp: resp, fut: None, _s: PhantomData}).poll(info) { match (FinishingMiddlewares{resp: resp, fut: None,
_s: PhantomData, _h: PhantomData}).poll(info) {
Ok(st) | Err(st) => st, Ok(st) | Err(st) => st,
} }
} }
} }
fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S>, PipelineState<S>> { fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S, H>, PipelineState<S, H>>
{
loop { loop {
// poll latest fut // poll latest fut
let not_ready = if let Some(ref mut fut) = self.fut { let not_ready = if let Some(ref mut fut) = self.fut {
@ -782,24 +803,26 @@ impl<S> FinishingMiddlewares<S> {
} }
} }
struct Completed<S>(PhantomData<S>); struct Completed<S, H>(PhantomData<S>, PhantomData<H>);
impl<S> Completed<S> { impl<S, H> Completed<S, H> {
#[inline] #[inline]
fn init(info: &mut PipelineInfo<S>) -> PipelineState<S> { fn init(info: &mut PipelineInfo<S>) -> PipelineState<S, H> {
if info.context.is_none() { if info.context.is_none() {
PipelineState::None PipelineState::None
} else { } else {
PipelineState::Completed(Completed(PhantomData)) PipelineState::Completed(Completed(PhantomData, PhantomData))
} }
} }
#[inline] #[inline]
fn poll(self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S>, PipelineState<S>> { fn poll(self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S, H>, PipelineState<S, H>> {
match info.poll_context() { match info.poll_context() {
Ok(Async::NotReady) => Ok(PipelineState::Completed(Completed(PhantomData))), Ok(Async::NotReady) =>
Ok(Async::Ready(())) => Ok(PipelineState::None), Ok(PipelineState::Completed(Completed(PhantomData, PhantomData))),
Ok(Async::Ready(())) =>
Ok(PipelineState::None),
Err(_) => Ok(PipelineState::Error), Err(_) => Ok(PipelineState::Error),
} }
} }
@ -813,11 +836,11 @@ mod tests {
use tokio_core::reactor::Core; use tokio_core::reactor::Core;
use futures::future::{lazy, result}; use futures::future::{lazy, result};
impl<S> PipelineState<S> { impl<S, H> PipelineState<S, H> {
fn is_none(&self) -> Option<bool> { fn is_none(&self) -> Option<bool> {
if let PipelineState::None = *self { Some(true) } else { None } if let PipelineState::None = *self { Some(true) } else { None }
} }
fn completed(self) -> Option<Completed<S>> { fn completed(self) -> Option<Completed<S, H>> {
if let PipelineState::Completed(c) = self { Some(c) } else { None } if let PipelineState::Completed(c) = self { Some(c) } else { None }
} }
} }
@ -831,14 +854,14 @@ mod tests {
fn test_completed() { fn test_completed() {
Core::new().unwrap().run(lazy(|| { Core::new().unwrap().run(lazy(|| {
let mut info = PipelineInfo::new(HttpRequest::default()); let mut info = PipelineInfo::new(HttpRequest::default());
Completed::init(&mut info).is_none().unwrap(); Completed::<(), Inner<()>>::init(&mut info).is_none().unwrap();
let req = HttpRequest::default(); let req = HttpRequest::default();
let mut ctx = HttpContext::new(req.clone(), MyActor); let mut ctx = HttpContext::new(req.clone(), MyActor);
let addr: Address<_> = ctx.address(); let addr: Address<_> = ctx.address();
let mut info = PipelineInfo::new(req); let mut info = PipelineInfo::new(req);
info.context = Some(Box::new(ctx)); info.context = Some(Box::new(ctx));
let mut state = Completed::init(&mut info).completed().unwrap(); let mut state = Completed::<(), Inner<()>>::init(&mut info).completed().unwrap();
let st = state.poll(&mut info).ok().unwrap(); let st = state.poll(&mut info).ok().unwrap();
let pp = Pipeline(info, st); let pp = Pipeline(info, st);

View File

@ -16,6 +16,7 @@ pub struct Router(Rc<Inner>);
struct Inner { struct Inner {
prefix: String, prefix: String,
prefix_len: usize,
regset: RegexSet, regset: RegexSet,
named: HashMap<String, (Pattern, bool)>, named: HashMap<String, (Pattern, bool)>,
patterns: Vec<Pattern>, patterns: Vec<Pattern>,
@ -24,8 +25,9 @@ struct Inner {
impl Router { impl Router {
/// Create new router /// Create new router
pub fn new<S>(prefix: &str, map: HashMap<Pattern, Option<Resource<S>>>) pub fn new<S>(prefix: &str,
-> (Router, Vec<Resource<S>>) settings: ServerSettings,
map: HashMap<Pattern, Option<Resource<S>>>) -> (Router, Vec<Resource<S>>)
{ {
let prefix = prefix.trim().trim_right_matches('/').to_owned(); let prefix = prefix.trim().trim_right_matches('/').to_owned();
let mut named = HashMap::new(); let mut named = HashMap::new();
@ -46,16 +48,14 @@ impl Router {
} }
} }
let len = prefix.len();
(Router(Rc::new( (Router(Rc::new(
Inner{ prefix: prefix, Inner{ prefix: prefix,
prefix_len: len,
regset: RegexSet::new(&paths).unwrap(), regset: RegexSet::new(&paths).unwrap(),
named: named, named: named,
patterns: patterns, patterns: patterns,
srv: ServerSettings::default() })), resources) srv: settings })), resources)
}
pub(crate) fn set_server_settings(&mut self, settings: ServerSettings) {
Rc::get_mut(&mut self.0).unwrap().srv = settings;
} }
/// Router prefix /// Router prefix
@ -74,7 +74,10 @@ impl Router {
pub fn recognize<S>(&self, req: &mut HttpRequest<S>) -> Option<usize> { pub fn recognize<S>(&self, req: &mut HttpRequest<S>) -> Option<usize> {
let mut idx = None; let mut idx = None;
{ {
let path = &req.path()[self.0.prefix.len()..]; if self.0.prefix_len > req.path().len() {
return None
}
let path = &req.path()[self.0.prefix_len..];
if path.is_empty() { if path.is_empty() {
if let Some(i) = self.0.regset.matches("/").into_iter().next() { if let Some(i) = self.0.regset.matches("/").into_iter().next() {
idx = Some(i); idx = Some(i);
@ -85,7 +88,7 @@ impl Router {
} }
if let Some(idx) = idx { if let Some(idx) = idx {
let path: &str = unsafe{ mem::transmute(&req.path()[self.0.prefix.len()..]) }; let path: &str = unsafe{ mem::transmute(&req.path()[self.0.prefix_len..]) };
self.0.patterns[idx].update_match_info(path, req); self.0.patterns[idx].update_match_info(path, req);
return Some(idx) return Some(idx)
} else { } else {
@ -94,13 +97,17 @@ impl Router {
} }
/// Check if application contains matching route. /// Check if application contains matching route.
///
/// This method does not take `prefix` into account.
/// For example if prefix is `/test` and router contains route `/name`,
/// following path would be recognizable `/test/name` but `has_route()` call
/// would return `false`.
pub fn has_route(&self, path: &str) -> bool { pub fn has_route(&self, path: &str) -> bool {
let p = &path[self.0.prefix.len()..]; if path.is_empty() {
if p.is_empty() {
if self.0.regset.matches("/").into_iter().next().is_some() { if self.0.regset.matches("/").into_iter().next().is_some() {
return true return true
} }
} else if self.0.regset.matches(p).into_iter().next().is_some() { } else if self.0.regset.matches(path).into_iter().next().is_some() {
return true return true
} }
false false
@ -208,12 +215,11 @@ impl Pattern {
{ {
let mut iter = elements.into_iter(); let mut iter = elements.into_iter();
let mut path = if let Some(prefix) = prefix { let mut path = if let Some(prefix) = prefix {
let mut path = String::from(prefix); format!("{}/", prefix)
path.push('/');
path
} else { } else {
String::new() String::new()
}; };
println!("TEST: {:?} {:?}", path, prefix);
for el in &self.elements { for el in &self.elements {
match *el { match *el {
PatternElement::Str(ref s) => path.push_str(s), PatternElement::Str(ref s) => path.push_str(s),
@ -300,11 +306,9 @@ impl Hash for Pattern {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use regex::Regex;
use super::*; use super::*;
use http::{Uri, Version, Method}; use regex::Regex;
use http::header::HeaderMap; use test::TestRequest;
use std::str::FromStr;
#[test] #[test]
fn test_recognizer() { fn test_recognizer() {
@ -315,47 +319,65 @@ mod tests {
routes.insert(Pattern::new("", "/v{val}/{val2}/index.html"), Some(Resource::default())); routes.insert(Pattern::new("", "/v{val}/{val2}/index.html"), Some(Resource::default()));
routes.insert(Pattern::new("", "/v/{tail:.*}"), Some(Resource::default())); routes.insert(Pattern::new("", "/v/{tail:.*}"), Some(Resource::default()));
routes.insert(Pattern::new("", "{test}/index.html"), Some(Resource::default())); routes.insert(Pattern::new("", "{test}/index.html"), Some(Resource::default()));
let (rec, _) = Router::new::<()>("", routes); let (rec, _) = Router::new::<()>("", ServerSettings::default(), routes);
let mut req = HttpRequest::new( let mut req = TestRequest::with_uri("/name").finish();
Method::GET, Uri::from_str("/name").unwrap(),
Version::HTTP_11, HeaderMap::new(), None);
assert!(rec.recognize(&mut req).is_some()); assert!(rec.recognize(&mut req).is_some());
assert!(req.match_info().is_empty()); assert!(req.match_info().is_empty());
let mut req = HttpRequest::new( let mut req = TestRequest::with_uri("/name/value").finish();
Method::GET, Uri::from_str("/name/value").unwrap(),
Version::HTTP_11, HeaderMap::new(), None);
assert!(rec.recognize(&mut req).is_some()); assert!(rec.recognize(&mut req).is_some());
assert_eq!(req.match_info().get("val").unwrap(), "value"); assert_eq!(req.match_info().get("val").unwrap(), "value");
assert_eq!(&req.match_info()["val"], "value"); assert_eq!(&req.match_info()["val"], "value");
let mut req = HttpRequest::new( let mut req = TestRequest::with_uri("/name/value2/index.html").finish();
Method::GET, Uri::from_str("/name/value2/index.html").unwrap(),
Version::HTTP_11, HeaderMap::new(), None);
assert!(rec.recognize(&mut req).is_some()); assert!(rec.recognize(&mut req).is_some());
assert_eq!(req.match_info().get("val").unwrap(), "value2"); assert_eq!(req.match_info().get("val").unwrap(), "value2");
let mut req = HttpRequest::new( let mut req = TestRequest::with_uri("/vtest/ttt/index.html").finish();
Method::GET, Uri::from_str("/vtest/ttt/index.html").unwrap(),
Version::HTTP_11, HeaderMap::new(), None);
assert!(rec.recognize(&mut req).is_some()); assert!(rec.recognize(&mut req).is_some());
assert_eq!(req.match_info().get("val").unwrap(), "test"); assert_eq!(req.match_info().get("val").unwrap(), "test");
assert_eq!(req.match_info().get("val2").unwrap(), "ttt"); assert_eq!(req.match_info().get("val2").unwrap(), "ttt");
let mut req = HttpRequest::new( let mut req = TestRequest::with_uri("/v/blah-blah/index.html").finish();
Method::GET, Uri::from_str("/v/blah-blah/index.html").unwrap(),
Version::HTTP_11, HeaderMap::new(), None);
assert!(rec.recognize(&mut req).is_some()); assert!(rec.recognize(&mut req).is_some());
assert_eq!(req.match_info().get("tail").unwrap(), "blah-blah/index.html"); assert_eq!(req.match_info().get("tail").unwrap(), "blah-blah/index.html");
let mut req = HttpRequest::new( let mut req = TestRequest::with_uri("/bbb/index.html").finish();
Method::GET, Uri::from_str("/bbb/index.html").unwrap(),
Version::HTTP_11, HeaderMap::new(), None);
assert!(rec.recognize(&mut req).is_some()); assert!(rec.recognize(&mut req).is_some());
assert_eq!(req.match_info().get("test").unwrap(), "bbb"); assert_eq!(req.match_info().get("test").unwrap(), "bbb");
} }
#[test]
fn test_recognizer_with_prefix() {
let mut routes = HashMap::new();
routes.insert(Pattern::new("", "/name"), Some(Resource::default()));
routes.insert(Pattern::new("", "/name/{val}"), Some(Resource::default()));
let (rec, _) = Router::new::<()>("/test", ServerSettings::default(), routes);
let mut req = TestRequest::with_uri("/name").finish();
assert!(rec.recognize(&mut req).is_none());
let mut req = TestRequest::with_uri("/test/name").finish();
assert!(rec.recognize(&mut req).is_some());
let mut req = TestRequest::with_uri("/test/name/value").finish();
assert!(rec.recognize(&mut req).is_some());
assert_eq!(req.match_info().get("val").unwrap(), "value");
assert_eq!(&req.match_info()["val"], "value");
// same patterns
let mut routes = HashMap::new();
routes.insert(Pattern::new("", "/name"), Some(Resource::default()));
routes.insert(Pattern::new("", "/name/{val}"), Some(Resource::default()));
let (rec, _) = Router::new::<()>("/test2", ServerSettings::default(), routes);
let mut req = TestRequest::with_uri("/name").finish();
assert!(rec.recognize(&mut req).is_none());
let mut req = TestRequest::with_uri("/test2/name").finish();
assert!(rec.recognize(&mut req).is_some());
}
fn assert_parse(pattern: &str, expected_re: &str) -> Regex { fn assert_parse(pattern: &str, expected_re: &str) -> Regex {
let (re_str, _) = Pattern::parse(pattern); let (re_str, _) = Pattern::parse(pattern);
assert_eq!(&*re_str, expected_re); assert_eq!(&*re_str, expected_re);

View File

@ -1,6 +1,5 @@
use std::{io, net, thread}; use std::{io, net, thread};
use std::rc::Rc; use std::rc::Rc;
use std::cell::{RefCell, RefMut};
use std::sync::{Arc, mpsc as sync_mpsc}; use std::sync::{Arc, mpsc as sync_mpsc};
use std::time::Duration; use std::time::Duration;
use std::marker::PhantomData; use std::marker::PhantomData;
@ -8,33 +7,32 @@ use std::collections::HashMap;
use actix::dev::*; use actix::dev::*;
use actix::System; use actix::System;
use futures::Stream; use futures::{Future, Sink, Stream};
use futures::sync::mpsc; use futures::sync::mpsc;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_core::reactor::Handle;
use tokio_core::net::TcpStream; use tokio_core::net::TcpStream;
use mio; use mio;
use num_cpus; use num_cpus;
use net2::{TcpBuilder, TcpStreamExt}; use net2::TcpBuilder;
#[cfg(feature="tls")]
use futures::{future, Future};
#[cfg(feature="tls")] #[cfg(feature="tls")]
use native_tls::TlsAcceptor; use native_tls::TlsAcceptor;
#[cfg(feature="tls")] #[cfg(feature="tls")]
use tokio_tls::{TlsStream, TlsAcceptorExt}; use tokio_tls::TlsStream;
#[cfg(feature="alpn")] #[cfg(feature="alpn")]
use futures::{future, Future}; use openssl::ssl::{SslMethod, SslAcceptorBuilder};
#[cfg(feature="alpn")]
use openssl::ssl::{SslMethod, SslAcceptor, SslAcceptorBuilder};
#[cfg(feature="alpn")] #[cfg(feature="alpn")]
use openssl::pkcs12::ParsedPkcs12; use openssl::pkcs12::ParsedPkcs12;
#[cfg(feature="alpn")] #[cfg(feature="alpn")]
use tokio_openssl::{SslStream, SslAcceptorExt}; use tokio_openssl::SslStream;
#[cfg(feature="signal")]
use actix::actors::signal;
use helpers; use helpers;
use channel::{HttpChannel, HttpHandler, IntoHttpHandler}; use channel::{HttpChannel, HttpHandler, IntoHttpHandler};
use worker::{Conn, Worker, WorkerSettings, StreamHandlerType, StopWorker};
/// Various server settings /// Various server settings
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -108,7 +106,8 @@ pub struct HttpServer<T, A, H, U>
workers: Vec<SyncAddress<Worker<H>>>, workers: Vec<SyncAddress<Worker<H>>>,
sockets: HashMap<net::SocketAddr, net::TcpListener>, sockets: HashMap<net::SocketAddr, net::TcpListener>,
accept: Vec<(mio::SetReadiness, sync_mpsc::Sender<Command>)>, accept: Vec<(mio::SetReadiness, sync_mpsc::Sender<Command>)>,
spawned: bool, exit: bool,
shutdown_timeout: u16,
} }
unsafe impl<T, A, H, U> Sync for HttpServer<T, A, H, U> where H: 'static {} unsafe impl<T, A, H, U> Sync for HttpServer<T, A, H, U> where H: 'static {}
@ -152,7 +151,8 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
workers: Vec::new(), workers: Vec::new(),
sockets: HashMap::new(), sockets: HashMap::new(),
accept: Vec::new(), accept: Vec::new(),
spawned: false, exit: false,
shutdown_timeout: 30,
} }
} }
@ -202,6 +202,27 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
self self
} }
#[cfg(feature="signal")]
/// Send `SystemExit` message to actix system
///
/// `SystemExit` message stops currently running system arbiter and all
/// nested arbiters.
pub fn system_exit(mut self) -> Self {
self.exit = true;
self
}
/// Timeout for graceful workers shutdown.
///
/// After receiving a stop signal, workers have this much time to finish serving requests.
/// Workers still alive after the timeout are force dropped.
///
/// By default shutdown timeout sets to 30 seconds.
pub fn shutdown_timeout(mut self, sec: u16) -> Self {
self.shutdown_timeout = sec;
self
}
/// Get addresses of bound sockets. /// Get addresses of bound sockets.
pub fn addrs(&self) -> Vec<net::SocketAddr> { pub fn addrs(&self) -> Vec<net::SocketAddr> {
self.sockets.keys().cloned().collect() self.sockets.keys().cloned().collect()
@ -235,23 +256,21 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
} }
fn start_workers(&mut self, settings: &ServerSettings, handler: &StreamHandlerType) fn start_workers(&mut self, settings: &ServerSettings, handler: &StreamHandlerType)
-> Vec<mpsc::UnboundedSender<IoStream<net::TcpStream>>> -> Vec<mpsc::UnboundedSender<Conn<net::TcpStream>>>
{ {
// start workers // start workers
let mut workers = Vec::new(); let mut workers = Vec::new();
for _ in 0..self.threads { for _ in 0..self.threads {
let s = settings.clone(); let s = settings.clone();
let (tx, rx) = mpsc::unbounded::<IoStream<net::TcpStream>>(); let (tx, rx) = mpsc::unbounded::<Conn<net::TcpStream>>();
let h = handler.clone(); let h = handler.clone();
let ka = self.keep_alive; let ka = self.keep_alive;
let factory = Arc::clone(&self.factory); let factory = Arc::clone(&self.factory);
let addr = Arbiter::start(move |ctx: &mut Context<_>| { let addr = Arbiter::start(move |ctx: &mut Context<_>| {
let mut apps: Vec<_> = (*factory)() let apps: Vec<_> = (*factory)()
.into_iter().map(|h| h.into_handler()).collect(); .into_iter()
for app in &mut apps { .map(|h| h.into_handler(s.clone())).collect();
app.server_settings(s.clone());
}
ctx.add_stream(rx); ctx.add_stream(rx);
Worker::new(apps, h, ka) Worker::new(apps, h, ka)
}); });
@ -337,11 +356,12 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
/// .bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0") /// .bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0")
/// .spawn(); /// .spawn();
/// ///
/// let _ = addr.call_fut(dev::StopServer).wait(); // <- Send `StopServer` message to server. /// let _ = addr.call_fut(
/// dev::StopServer{graceful:true}).wait(); // <- Send `StopServer` message to server.
/// } /// }
/// ``` /// ```
pub fn spawn(mut self) -> SyncAddress<Self> { pub fn spawn(mut self) -> SyncAddress<Self> {
self.spawned = true; self.exit = true;
let (tx, rx) = sync_mpsc::channel(); let (tx, rx) = sync_mpsc::channel();
thread::spawn(move || { thread::spawn(move || {
@ -460,35 +480,62 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
// set server settings // set server settings
let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap(); let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap();
let settings = ServerSettings::new(Some(addr), &self.host, secure); let settings = ServerSettings::new(Some(addr), &self.host, secure);
let mut apps: Vec<_> = (*self.factory)().into_iter().map(|h| h.into_handler()).collect(); let apps: Vec<_> = (*self.factory)()
for app in &mut apps { .into_iter()
app.server_settings(settings.clone()); .map(|h| h.into_handler(settings.clone())).collect();
}
self.h = Some(Rc::new(WorkerSettings::new(apps, self.keep_alive))); self.h = Some(Rc::new(WorkerSettings::new(apps, self.keep_alive)));
// start server // start server
HttpServer::create(move |ctx| { HttpServer::create(move |ctx| {
ctx.add_stream(stream.map( ctx.add_stream(stream.map(
move |(t, _)| IoStream{io: t, peer: None, http2: false})); move |(t, _)| Conn{io: t, peer: None, http2: false}));
self self
}) })
} }
} }
#[derive(Message)] #[cfg(feature="signal")]
struct IoStream<T> { /// Unix Signals support
io: T, /// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and send `SystemExit(0)`
peer: Option<net::SocketAddr>, /// message to `System` actor.
http2: bool, impl<T, A, H, U> Handler<signal::Signal> for HttpServer<T, A, H, U>
where T: AsyncRead + AsyncWrite + 'static,
H: HttpHandler + 'static,
U: 'static,
A: 'static,
{
fn handle(&mut self, msg: signal::Signal, ctx: &mut Context<Self>)
-> Response<Self, signal::Signal>
{
match msg.0 {
signal::SignalType::Int => {
info!("SIGINT received, exiting");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer{graceful: false}, ctx);
}
signal::SignalType::Term => {
info!("SIGTERM received, stopping");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer{graceful: true}, ctx);
}
signal::SignalType::Quit => {
info!("SIGQUIT received, exiting");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer{graceful: false}, ctx);
}
_ => (),
};
Self::empty()
}
} }
impl<T, A, H, U> StreamHandler<IoStream<T>, io::Error> for HttpServer<T, A, H, U> impl<T, A, H, U> StreamHandler<Conn<T>, io::Error> for HttpServer<T, A, H, U>
where T: AsyncRead + AsyncWrite + 'static, where T: AsyncRead + AsyncWrite + 'static,
H: HttpHandler + 'static, H: HttpHandler + 'static,
U: 'static, U: 'static,
A: 'static {} A: 'static {}
impl<T, A, H, U> Handler<IoStream<T>, io::Error> for HttpServer<T, A, H, U> impl<T, A, H, U> Handler<Conn<T>, io::Error> for HttpServer<T, A, H, U>
where T: AsyncRead + AsyncWrite + 'static, where T: AsyncRead + AsyncWrite + 'static,
H: HttpHandler + 'static, H: HttpHandler + 'static,
U: 'static, U: 'static,
@ -498,8 +545,7 @@ impl<T, A, H, U> Handler<IoStream<T>, io::Error> for HttpServer<T, A, H, U>
debug!("Error handling request: {}", err) debug!("Error handling request: {}", err)
} }
fn handle(&mut self, msg: IoStream<T>, _: &mut Context<Self>) fn handle(&mut self, msg: Conn<T>, _: &mut Context<Self>) -> Response<Self, Conn<T>>
-> Response<Self, IoStream<T>>
{ {
Arbiter::handle().spawn( Arbiter::handle().spawn(
HttpChannel::new(Rc::clone(self.h.as_ref().unwrap()), msg.io, msg.peer, msg.http2)); HttpChannel::new(Rc::clone(self.h.as_ref().unwrap()), msg.io, msg.peer, msg.http2));
@ -522,7 +568,9 @@ pub struct ResumeServer;
/// ///
/// If server starts with `spawn()` method, then spawned thread get terminated. /// If server starts with `spawn()` method, then spawned thread get terminated.
#[derive(Message)] #[derive(Message)]
pub struct StopServer; pub struct StopServer {
pub graceful: bool
}
impl<T, A, H, U> Handler<PauseServer> for HttpServer<T, A, H, U> impl<T, A, H, U> Handler<PauseServer> for HttpServer<T, A, H, U>
where T: AsyncRead + AsyncWrite + 'static, where T: AsyncRead + AsyncWrite + 'static,
@ -562,177 +610,50 @@ impl<T, A, H, U> Handler<StopServer> for HttpServer<T, A, H, U>
U: 'static, U: 'static,
A: 'static, A: 'static,
{ {
fn handle(&mut self, _: StopServer, ctx: &mut Context<Self>) -> Response<Self, StopServer> fn handle(&mut self, msg: StopServer, ctx: &mut Context<Self>) -> Response<Self, StopServer>
{ {
// stop accept threads
for item in &self.accept { for item in &self.accept {
let _ = item.1.send(Command::Stop); let _ = item.1.send(Command::Stop);
let _ = item.0.set_readiness(mio::Ready::readable()); let _ = item.0.set_readiness(mio::Ready::readable());
} }
ctx.stop();
// stop workers
let (tx, rx) = mpsc::channel(1);
let dur = if msg.graceful {
Some(Duration::new(u64::from(self.shutdown_timeout), 0))
} else {
None
};
for worker in &self.workers {
let tx2 = tx.clone();
let fut = worker.call(self, StopWorker{graceful: dur});
ActorFuture::then(fut, move |_, slf, _| {
slf.workers.pop();
if slf.workers.is_empty() {
let _ = tx2.send(());
// we need to stop system if server was spawned // we need to stop system if server was spawned
if self.spawned { if slf.exit {
Arbiter::system().send(msgs::SystemExit(0))
}
}
fut::ok(())
}).spawn(ctx);
}
if !self.workers.is_empty() {
Self::async_reply(
rx.into_future().map(|_| ()).map_err(|_| ()).actfuture())
} else {
// we need to stop system if server was spawned
if self.exit {
Arbiter::system().send(msgs::SystemExit(0)) Arbiter::system().send(msgs::SystemExit(0))
} }
Self::empty() Self::empty()
} }
} }
/// Http worker
///
/// Worker accepts Socket objects via unbounded channel and start requests processing.
struct Worker<H> {
h: Rc<WorkerSettings<H>>,
hnd: Handle,
handler: StreamHandlerType,
}
pub(crate) struct WorkerSettings<H> {
h: RefCell<Vec<H>>,
enabled: bool,
keep_alive: u64,
bytes: Rc<helpers::SharedBytesPool>,
messages: Rc<helpers::SharedMessagePool>,
}
impl<H> WorkerSettings<H> {
pub(crate) fn new(h: Vec<H>, keep_alive: Option<u64>) -> WorkerSettings<H> {
WorkerSettings {
h: RefCell::new(h),
enabled: if let Some(ka) = keep_alive { ka > 0 } else { false },
keep_alive: keep_alive.unwrap_or(0),
bytes: Rc::new(helpers::SharedBytesPool::new()),
messages: Rc::new(helpers::SharedMessagePool::new()),
}
}
pub fn handlers(&self) -> RefMut<Vec<H>> {
self.h.borrow_mut()
}
pub fn keep_alive(&self) -> u64 {
self.keep_alive
}
pub fn keep_alive_enabled(&self) -> bool {
self.enabled
}
pub fn get_shared_bytes(&self) -> helpers::SharedBytes {
helpers::SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes))
}
pub fn get_http_message(&self) -> helpers::SharedHttpMessage {
helpers::SharedHttpMessage::new(self.messages.get(), Rc::clone(&self.messages))
}
}
impl<H: 'static> Worker<H> {
fn new(h: Vec<H>, handler: StreamHandlerType, keep_alive: Option<u64>) -> Worker<H> {
Worker {
h: Rc::new(WorkerSettings::new(h, keep_alive)),
hnd: Arbiter::handle().clone(),
handler: handler,
}
}
fn update_time(&self, ctx: &mut Context<Self>) {
helpers::update_date();
ctx.run_later(Duration::new(1, 0), |slf, ctx| slf.update_time(ctx));
}
}
impl<H: 'static> Actor for Worker<H> {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.update_time(ctx);
}
}
impl<H> StreamHandler<IoStream<net::TcpStream>> for Worker<H>
where H: HttpHandler + 'static {}
impl<H> Handler<IoStream<net::TcpStream>> for Worker<H>
where H: HttpHandler + 'static,
{
fn handle(&mut self, msg: IoStream<net::TcpStream>, _: &mut Context<Self>)
-> Response<Self, IoStream<net::TcpStream>>
{
if !self.h.keep_alive_enabled() &&
msg.io.set_keepalive(Some(Duration::new(75, 0))).is_err()
{
error!("Can not set socket keep-alive option");
}
self.handler.handle(Rc::clone(&self.h), &self.hnd, msg);
Self::empty()
}
}
#[derive(Clone)]
enum StreamHandlerType {
Normal,
#[cfg(feature="tls")]
Tls(TlsAcceptor),
#[cfg(feature="alpn")]
Alpn(SslAcceptor),
}
impl StreamHandlerType {
fn handle<H: HttpHandler>(&mut self,
h: Rc<WorkerSettings<H>>,
hnd: &Handle,
msg: IoStream<net::TcpStream>) {
match *self {
StreamHandlerType::Normal => {
let io = TcpStream::from_stream(msg.io, hnd)
.expect("failed to associate TCP stream");
hnd.spawn(HttpChannel::new(h, io, msg.peer, msg.http2));
}
#[cfg(feature="tls")]
StreamHandlerType::Tls(ref acceptor) => {
let IoStream { io, peer, http2 } = msg;
let io = TcpStream::from_stream(io, hnd)
.expect("failed to associate TCP stream");
hnd.spawn(
TlsAcceptorExt::accept_async(acceptor, io).then(move |res| {
match res {
Ok(io) => Arbiter::handle().spawn(
HttpChannel::new(h, io, peer, http2)),
Err(err) =>
trace!("Error during handling tls connection: {}", err),
};
future::result(Ok(()))
})
);
}
#[cfg(feature="alpn")]
StreamHandlerType::Alpn(ref acceptor) => {
let IoStream { io, peer, .. } = msg;
let io = TcpStream::from_stream(io, hnd)
.expect("failed to associate TCP stream");
hnd.spawn(
SslAcceptorExt::accept_async(acceptor, io).then(move |res| {
match res {
Ok(io) => {
let http2 = if let Some(p) = io.get_ref().ssl().selected_alpn_protocol()
{
p.len() == 2 && &p == b"h2"
} else {
false
};
Arbiter::handle().spawn(HttpChannel::new(h, io, peer, http2));
},
Err(err) =>
trace!("Error during handling tls connection: {}", err),
};
future::result(Ok(()))
})
);
}
}
}
} }
enum Command { enum Command {
@ -742,7 +663,7 @@ enum Command {
} }
fn start_accept_thread(sock: net::TcpListener, addr: net::SocketAddr, backlog: i32, fn start_accept_thread(sock: net::TcpListener, addr: net::SocketAddr, backlog: i32,
workers: Vec<mpsc::UnboundedSender<IoStream<net::TcpStream>>>) workers: Vec<mpsc::UnboundedSender<Conn<net::TcpStream>>>)
-> (mio::SetReadiness, sync_mpsc::Sender<Command>) -> (mio::SetReadiness, sync_mpsc::Sender<Command>)
{ {
let (tx, rx) = sync_mpsc::channel(); let (tx, rx) = sync_mpsc::channel();
@ -793,7 +714,7 @@ fn start_accept_thread(sock: net::TcpListener, addr: net::SocketAddr, backlog: i
loop { loop {
match server.accept_std() { match server.accept_std() {
Ok((sock, addr)) => { Ok((sock, addr)) => {
let msg = IoStream{ let msg = Conn{
io: sock, peer: Some(addr), http2: false}; io: sock, peer: Some(addr), http2: false};
workers[next].unbounded_send(msg) workers[next].unbounded_send(msg)
.expect("worker thread died"); .expect("worker thread died");

View File

@ -16,7 +16,7 @@ use tokio_core::reactor::Core;
use net2::TcpBuilder; use net2::TcpBuilder;
use error::Error; use error::Error;
use server::HttpServer; use server::{HttpServer, ServerSettings};
use handler::{Handler, Responder, ReplyItem}; use handler::{Handler, Responder, ReplyItem};
use channel::{HttpHandler, IntoHttpHandler}; use channel::{HttpHandler, IntoHttpHandler};
use middleware::Middleware; use middleware::Middleware;
@ -199,8 +199,8 @@ impl<S: 'static> TestApp<S> {
impl<S: 'static> IntoHttpHandler for TestApp<S> { impl<S: 'static> IntoHttpHandler for TestApp<S> {
type Handler = HttpApplication<S>; type Handler = HttpApplication<S>;
fn into_handler(self) -> HttpApplication<S> { fn into_handler(mut self, settings: ServerSettings) -> HttpApplication<S> {
self.app.unwrap().finish() self.app.take().unwrap().into_handler(settings)
} }
} }
@ -347,7 +347,7 @@ impl<S> TestRequest<S> {
let req = HttpRequest::new(method, uri, version, headers, payload); let req = HttpRequest::new(method, uri, version, headers, payload);
req.as_mut().cookies = cookies; req.as_mut().cookies = cookies;
req.as_mut().params = params; req.as_mut().params = params;
let (router, _) = Router::new::<S>("/", HashMap::new()); let (router, _) = Router::new::<S>("/", ServerSettings::default(), HashMap::new());
req.with_state(Rc::new(state), router) req.with_state(Rc::new(state), router)
} }

255
src/worker.rs Normal file
View File

@ -0,0 +1,255 @@
use std::{net, time};
use std::rc::Rc;
use std::cell::{Cell, RefCell, RefMut};
use futures::Future;
use futures::unsync::oneshot;
use tokio_core::net::TcpStream;
use tokio_core::reactor::Handle;
use net2::TcpStreamExt;
#[cfg(feature="tls")]
use futures::future;
#[cfg(feature="tls")]
use native_tls::TlsAcceptor;
#[cfg(feature="tls")]
use tokio_tls::TlsAcceptorExt;
#[cfg(feature="alpn")]
use futures::future;
#[cfg(feature="alpn")]
use openssl::ssl::SslAcceptor;
#[cfg(feature="alpn")]
use tokio_openssl::SslAcceptorExt;
use actix::*;
use actix::msgs::StopArbiter;
use helpers;
use channel::{HttpChannel, HttpHandler};
#[derive(Message)]
pub(crate) struct Conn<T> {
pub io: T,
pub peer: Option<net::SocketAddr>,
pub http2: bool,
}
/// Stop worker message. Returns `true` on successful shutdown
/// and `false` if some connections still alive.
#[derive(Message)]
#[rtype(bool)]
pub(crate) struct StopWorker {
pub graceful: Option<time::Duration>,
}
pub(crate) struct WorkerSettings<H> {
h: RefCell<Vec<H>>,
enabled: bool,
keep_alive: u64,
bytes: Rc<helpers::SharedBytesPool>,
messages: Rc<helpers::SharedMessagePool>,
channels: Cell<usize>,
}
impl<H> WorkerSettings<H> {
pub(crate) fn new(h: Vec<H>, keep_alive: Option<u64>) -> WorkerSettings<H> {
WorkerSettings {
h: RefCell::new(h),
enabled: if let Some(ka) = keep_alive { ka > 0 } else { false },
keep_alive: keep_alive.unwrap_or(0),
bytes: Rc::new(helpers::SharedBytesPool::new()),
messages: Rc::new(helpers::SharedMessagePool::new()),
channels: Cell::new(0),
}
}
pub fn handlers(&self) -> RefMut<Vec<H>> {
self.h.borrow_mut()
}
pub fn keep_alive(&self) -> u64 {
self.keep_alive
}
pub fn keep_alive_enabled(&self) -> bool {
self.enabled
}
pub fn get_shared_bytes(&self) -> helpers::SharedBytes {
helpers::SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes))
}
pub fn get_http_message(&self) -> helpers::SharedHttpMessage {
helpers::SharedHttpMessage::new(self.messages.get(), Rc::clone(&self.messages))
}
pub fn add_channel(&self) {
self.channels.set(self.channels.get()+1);
}
pub fn remove_channel(&self) {
let num = self.channels.get();
if num > 0 {
self.channels.set(num-1);
} else {
error!("Number of removed channels is bigger than added channel. Bug in actix-web");
}
}
}
/// Http worker
///
/// Worker accepts Socket objects via unbounded channel and start requests processing.
pub(crate) struct Worker<H> {
h: Rc<WorkerSettings<H>>,
hnd: Handle,
handler: StreamHandlerType,
}
impl<H: 'static> Worker<H> {
pub(crate) fn new(h: Vec<H>, handler: StreamHandlerType, keep_alive: Option<u64>)
-> Worker<H>
{
Worker {
h: Rc::new(WorkerSettings::new(h, keep_alive)),
hnd: Arbiter::handle().clone(),
handler: handler,
}
}
fn update_time(&self, ctx: &mut Context<Self>) {
helpers::update_date();
ctx.run_later(time::Duration::new(1, 0), |slf, ctx| slf.update_time(ctx));
}
fn shutdown_timeout(&self, ctx: &mut Context<Self>,
tx: oneshot::Sender<bool>, dur: time::Duration) {
// sleep for 1 second and then check again
ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| {
let num = slf.h.channels.get();
if num == 0 {
let _ = tx.send(true);
Arbiter::arbiter().send(StopArbiter(0));
} else if let Some(d) = dur.checked_sub(time::Duration::new(1, 0)) {
slf.shutdown_timeout(ctx, tx, d);
} else {
info!("Force shutdown http worker, {} connections", num);
let _ = tx.send(false);
Arbiter::arbiter().send(StopArbiter(0));
}
});
}
}
impl<H: 'static> Actor for Worker<H> {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.update_time(ctx);
}
}
impl<H> StreamHandler<Conn<net::TcpStream>> for Worker<H>
where H: HttpHandler + 'static {}
impl<H> Handler<Conn<net::TcpStream>> for Worker<H>
where H: HttpHandler + 'static,
{
fn handle(&mut self, msg: Conn<net::TcpStream>, _: &mut Context<Self>)
-> Response<Self, Conn<net::TcpStream>>
{
if !self.h.keep_alive_enabled() &&
msg.io.set_keepalive(Some(time::Duration::new(75, 0))).is_err()
{
error!("Can not set socket keep-alive option");
}
self.handler.handle(Rc::clone(&self.h), &self.hnd, msg);
Self::empty()
}
}
/// `StopWorker` message handler
impl<H> Handler<StopWorker> for Worker<H>
where H: HttpHandler + 'static,
{
fn handle(&mut self, msg: StopWorker, ctx: &mut Context<Self>) -> Response<Self, StopWorker>
{
let num = self.h.channels.get();
if num == 0 {
info!("Shutting down http worker, 0 connections");
Self::reply(true)
} else if let Some(dur) = msg.graceful {
info!("Graceful http worker shutdown, {} connections", num);
let (tx, rx) = oneshot::channel();
self.shutdown_timeout(ctx, tx, dur);
Self::async_reply(rx.map_err(|_| ()).actfuture())
} else {
info!("Force shutdown http worker, {} connections", num);
Self::reply(false)
}
}
}
#[derive(Clone)]
pub(crate) enum StreamHandlerType {
Normal,
#[cfg(feature="tls")]
Tls(TlsAcceptor),
#[cfg(feature="alpn")]
Alpn(SslAcceptor),
}
impl StreamHandlerType {
fn handle<H: HttpHandler>(&mut self,
h: Rc<WorkerSettings<H>>,
hnd: &Handle, msg: Conn<net::TcpStream>) {
match *self {
StreamHandlerType::Normal => {
let io = TcpStream::from_stream(msg.io, hnd)
.expect("failed to associate TCP stream");
hnd.spawn(HttpChannel::new(h, io, msg.peer, msg.http2));
}
#[cfg(feature="tls")]
StreamHandlerType::Tls(ref acceptor) => {
let Conn { io, peer, http2 } = msg;
let io = TcpStream::from_stream(io, hnd)
.expect("failed to associate TCP stream");
hnd.spawn(
TlsAcceptorExt::accept_async(acceptor, io).then(move |res| {
match res {
Ok(io) => Arbiter::handle().spawn(
HttpChannel::new(h, io, peer, http2)),
Err(err) =>
trace!("Error during handling tls connection: {}", err),
};
future::result(Ok(()))
})
);
}
#[cfg(feature="alpn")]
StreamHandlerType::Alpn(ref acceptor) => {
let Conn { io, peer, .. } = msg;
let io = TcpStream::from_stream(io, hnd)
.expect("failed to associate TCP stream");
hnd.spawn(
SslAcceptorExt::accept_async(acceptor, io).then(move |res| {
match res {
Ok(io) => {
let http2 = if let Some(p) = io.get_ref().ssl().selected_alpn_protocol()
{
p.len() == 2 && &p == b"h2"
} else {
false
};
Arbiter::handle().spawn(HttpChannel::new(h, io, peer, http2));
},
Err(err) =>
trace!("Error during handling tls connection: {}", err),
};
future::result(Ok(()))
})
);
}
}
}
}

View File

@ -4,7 +4,7 @@ extern crate tokio_core;
extern crate reqwest; extern crate reqwest;
extern crate futures; extern crate futures;
use std::{net, thread}; use std::{net, thread, time};
use std::sync::{Arc, mpsc}; use std::sync::{Arc, mpsc};
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use futures::Future; use futures::Future;
@ -24,7 +24,7 @@ fn test_start() {
.resource("/", |r| r.method(Method::GET).h(httpcodes::HTTPOk))]); .resource("/", |r| r.method(Method::GET).h(httpcodes::HTTPOk))]);
let srv = srv.bind("127.0.0.1:0").unwrap(); let srv = srv.bind("127.0.0.1:0").unwrap();
let addr = srv.addrs()[0].clone(); let addr = srv.addrs()[0];
let srv_addr = srv.start(); let srv_addr = srv.start();
let _ = tx.send((addr, srv_addr)); let _ = tx.send((addr, srv_addr));
sys.run(); sys.run();
@ -34,6 +34,7 @@ fn test_start() {
// pause // pause
let _ = srv_addr.call_fut(dev::PauseServer).wait(); let _ = srv_addr.call_fut(dev::PauseServer).wait();
thread::sleep(time::Duration::from_millis(100));
assert!(net::TcpStream::connect(addr).is_err()); assert!(net::TcpStream::connect(addr).is_err());
// resume // resume